1use log::*;
10
11use super::*;
12
13struct DurableMailboxSender(Buffer<MessageEnvelope>);
17
18impl DurableMailboxSender {
19 fn new(
20 write_ahead_log: impl MessageLog<MessageEnvelope> + 'static,
21 inner: impl MailboxSender + 'static,
22 ) -> Self {
23 let write_ahead_log = Arc::new(tokio::sync::Mutex::new(write_ahead_log));
24 let inner = Arc::new(inner);
25 let sequencer =
26 Buffer::new(
27 move |envelope: MessageEnvelope,
28 return_handle: PortHandle<Undeliverable<MessageEnvelope>>| {
29 let write_ahead_log = write_ahead_log.clone();
30 let inner = inner.clone();
31 let return_handle = return_handle.clone();
32 async move {
33 let envelope_copy = envelope.clone(); let port_id = envelope.dest().clone();
35 let mut log = write_ahead_log.lock().await;
36 let append_result = log.append(envelope).await.map_err(|err| {
39 MailboxSenderError::new_bound(port_id.clone(), err.into())
40 });
41
42 let flush_result = log.flush().await.map_err(|err| {
43 MailboxSenderError::new_bound(port_id.clone(), err.into())
44 });
45
46 drop(log);
47
48 if append_result.and(flush_result).is_ok() {
49 inner.post(envelope_copy, return_handle);
50 } else {
51 envelope_copy.undeliverable(
52 DeliveryError::BrokenLink(
53 "failed to append or flush in durable sender".to_string(),
54 ),
55 return_handle,
56 );
57 }
58 }
59 },
60 );
61
62 Self(sequencer)
63 }
64
65 async fn flush(&mut self) -> Result<(), watch::error::RecvError> {
66 self.0.flush().await
67 }
68}
69
70#[async_trait]
71impl MailboxSender for DurableMailboxSender {
72 fn post_unchecked(
73 &self,
74 envelope: MessageEnvelope,
75 return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
76 ) {
77 if let Err(mpsc::error::SendError((envelope, return_handle))) =
78 self.0.send((envelope, return_handle))
79 {
80 envelope.undeliverable(
81 DeliveryError::BrokenLink("failed to post in DurableMailboxSender".to_string()),
82 return_handle,
83 );
84 }
85 }
86}
87
88pub mod log {
89
90 use std::fmt::Debug;
94
95 use async_trait::async_trait;
96 use futures::stream::Stream;
97
98 use crate::RemoteMessage;
99
100 pub type SeqId = u64;
102
103 #[derive(thiserror::Error, Debug)]
106 #[non_exhaustive]
107 pub enum MessageLogError {
108 #[error("flush: [{0}, {1})")]
110 Flush(SeqId, SeqId, #[source] anyhow::Error),
111
112 #[error("append: {0}")]
114 Append(SeqId, #[source] anyhow::Error),
115
116 #[error("read: {0}")]
118 Read(SeqId, #[source] anyhow::Error),
119
120 #[error("trim: {0}")]
122 Trim(SeqId, #[source] anyhow::Error),
123
124 #[error(transparent)]
126 Other(#[from] anyhow::Error),
127 }
128
129 #[async_trait]
135 pub trait MessageLog<M: RemoteMessage>: Sync + Send {
136 type Stream<'a>: Stream<Item = Result<(SeqId, M), MessageLogError>> + Send
138 where
139 Self: 'a;
140
141 async fn append(&mut self, message: M) -> Result<(), MessageLogError>;
144
145 async fn flush(&mut self) -> Result<SeqId, MessageLogError>;
147
148 async fn append_and_flush(&mut self, message: &M) -> Result<SeqId, MessageLogError>;
151
152 async fn trim(&mut self, new_start: SeqId) -> Result<(), MessageLogError>;
154
155 async fn read(&self, from: SeqId) -> Result<Self::Stream<'_>, MessageLogError>;
159
160 async fn read_one(&self, seq_id: SeqId) -> Result<M, MessageLogError>;
164 }
165}
166
167pub mod test_utils {
169
170 use std::collections::VecDeque;
171
172 use futures::pin_mut;
173 use log::SeqId;
174 use tokio_stream::StreamExt;
175
176 use super::*;
177
178 #[derive(Clone)]
180 pub struct TestLog<M: RemoteMessage> {
181 queue: Arc<Mutex<VecDeque<(SeqId, M)>>>,
182 current_seq_id: Arc<Mutex<SeqId>>,
183 observer: Option<mpsc::UnboundedSender<(String, M)>>,
185 }
186
187 impl<M: RemoteMessage> Default for TestLog<M> {
188 fn default() -> Self {
189 Self::new()
190 }
191 }
192
193 impl<M: RemoteMessage> TestLog<M> {
194 pub fn new() -> Self {
196 Self {
197 queue: Arc::new(Mutex::new(VecDeque::new())),
198 current_seq_id: Arc::new(Mutex::new(0)),
199 observer: None,
200 }
201 }
202
203 pub fn new_with_observer(observer: mpsc::UnboundedSender<(String, M)>) -> Self {
207 Self {
208 queue: Arc::new(Mutex::new(VecDeque::new())),
209 current_seq_id: Arc::new(Mutex::new(0)),
210 observer: Some(observer),
211 }
212 }
213 }
214
215 #[async_trait]
216 impl<M: RemoteMessage + Clone> MessageLog<M> for TestLog<M> {
217 type Stream<'a> =
218 futures::stream::Iter<std::vec::IntoIter<Result<(SeqId, M), MessageLogError>>>;
219
220 async fn append(&mut self, message: M) -> Result<(), MessageLogError> {
221 let mut seq_id = self.current_seq_id.lock().unwrap();
222 self.queue
223 .lock()
224 .unwrap()
225 .push_back((*seq_id, message.clone()));
226 *seq_id += 1;
227 if let Some(observer) = &self.observer {
228 observer.send(("append".to_string(), message)).unwrap();
229 }
230 Ok(())
231 }
232
233 async fn flush(&mut self) -> Result<SeqId, MessageLogError> {
234 let seq_id = *self.current_seq_id.lock().unwrap();
235 Ok(seq_id)
236 }
237
238 async fn append_and_flush(&mut self, message: &M) -> Result<SeqId, MessageLogError> {
239 self.append(message.clone()).await?;
240 self.flush().await
241 }
242
243 async fn trim(&mut self, new_start: SeqId) -> Result<(), MessageLogError> {
244 let mut queue = self.queue.lock().unwrap();
245 while let Some((id, _)) = queue.front() {
246 if *id < new_start {
247 queue.pop_front();
248 } else {
249 break;
250 }
251 }
252 Ok(())
253 }
254
255 async fn read(&self, seq_id: SeqId) -> Result<Self::Stream<'_>, MessageLogError> {
256 let queue = self.queue.lock().unwrap();
257 let filtered_items: Vec<_> = queue
258 .iter()
259 .filter(move |(id, _)| *id >= seq_id)
260 .map(|(seq_id, msg)| Ok((*seq_id, msg.clone())))
261 .collect();
262 for entry in filtered_items.iter() {
263 if let Some(observer) = &self.observer
264 && let Ok((_, msg)) = entry.as_ref()
265 {
266 observer.send(("read".to_string(), msg.clone())).unwrap();
267 }
268 }
269 Ok(futures::stream::iter(filtered_items))
270 }
271
272 async fn read_one(&self, seq_id: SeqId) -> Result<M, MessageLogError> {
273 let it = self.read(seq_id).await?;
274
275 pin_mut!(it);
276 match it.next().await {
277 Some(Ok((result_seq_id, message))) => {
278 if result_seq_id != seq_id {
279 panic!("no seq id {}", seq_id);
280 }
281 return Ok(message);
282 }
283 Some(Err(err)) => {
284 return Err(err);
285 }
286 None => {
287 return Err(MessageLogError::Read(
288 seq_id,
289 anyhow::anyhow!("failed to find message with sequence {}", seq_id),
290 ));
291 }
292 }
293 }
294 }
295}
296
297#[cfg(test)]
298mod tests {
299
300 use std::assert_matches::assert_matches;
301 use std::mem::drop;
302
303 use futures::StreamExt;
304
305 use super::test_utils::TestLog;
306 use super::*;
307 use crate::id;
308 use crate::mailbox::log::SeqId;
309
310 #[tokio::test]
311 async fn test_local_write_ahead_log_basic() {
312 let mut wal = TestLog::new();
313 wal.append(124u64).await.unwrap();
314 wal.append(56u64).await.unwrap();
315 let seq_id = wal.append_and_flush(&999u64).await.unwrap();
316 assert_eq!(seq_id, 3);
317
318 let mut it = wal.read(1).await.unwrap();
320 let (next_seq, message): (SeqId, u64) = it.next().await.unwrap().unwrap();
321 assert_eq!(next_seq, 1);
322 assert_eq!(message, 56u64);
323 let (next_seq, message) = it.next().await.unwrap().unwrap();
324 assert_eq!(next_seq, 2);
325 assert_eq!(message, 999u64);
326 assert_matches!(it.next().await, None);
327 drop(it);
329
330 wal.trim(2).await.unwrap();
332 let seq_id = wal.append_and_flush(&777u64).await.unwrap();
333 assert_eq!(seq_id, 4);
334 let mut it = wal.read(2).await.unwrap();
335 let (next_seq, message): (SeqId, u64) = it.next().await.unwrap().unwrap();
336 assert_eq!(next_seq, 2);
337 assert_eq!(message, 999u64);
338 let (next_seq, message) = it.next().await.unwrap().unwrap();
339 assert_eq!(next_seq, 3);
340 assert_eq!(message, 777u64);
341 assert_matches!(it.next().await, None);
342 }
343
344 #[tokio::test]
345 async fn test_durable_mailbox_sender() {
346 let inner = Mailbox::new_detached(id!(world0[0].actor0));
347 let write_ahead_log = TestLog::new();
348 let mut durable_mbox = DurableMailboxSender::new(write_ahead_log.clone(), inner.clone());
349
350 let (port1, mut receiver1) = inner.open_port::<u64>();
351 let (port2, mut _receiver2) = inner.open_port::<u64>();
352
353 let port1 = port1.bind();
355 let port2 = port2.bind();
356
357 durable_mbox.post(
358 MessageEnvelope::new_unknown(
359 port1.port_id().clone(),
360 wirevalue::Any::serialize(&1u64).unwrap(),
361 ),
362 monitored_return_handle(),
363 );
364 durable_mbox.post(
365 MessageEnvelope::new_unknown(
366 port2.port_id().clone(),
367 wirevalue::Any::serialize(&2u64).unwrap(),
368 ),
369 monitored_return_handle(),
370 );
371 durable_mbox.post(
372 MessageEnvelope::new_unknown(
373 port1.port_id().clone(),
374 wirevalue::Any::serialize(&3u64).unwrap(),
375 ),
376 monitored_return_handle(),
377 );
378 assert_eq!(receiver1.recv().await.unwrap(), 1u64);
379
380 durable_mbox.flush().await.unwrap();
381
382 let mut it = write_ahead_log.read(1).await.unwrap();
383 let (seq, message): (SeqId, MessageEnvelope) = it.next().await.unwrap().unwrap();
384 assert_eq!(seq, 1);
385 assert_eq!(port2.port_id(), message.dest());
386 assert_eq!(2u64, message.deserialized::<u64>().unwrap());
387 let (seq, message): (SeqId, MessageEnvelope) = it.next().await.unwrap().unwrap();
388 assert_eq!(seq, 2);
389 assert_eq!(3u64, message.deserialized::<u64>().unwrap());
390 }
391}