algebra/crdt.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! **State-based CRDTs** built on lattice primitives.
10//!
11//! This module provides convergent replicated data types implemented in
12//! terms of [`JoinSemilattice`] / [`BoundedJoinSemilattice`]. All types
13//! here are *state-based* CRDTs: replicas exchange full lattice states
14//! and merge them with `join`, guaranteeing convergence under arbitrary
15//! message reordering and duplication.
16//!
17//! # Included Types
18//!
19//! - [`LWW<T>`]: a **Last-Writer-Wins register** lattice storing
20//! `(value, ts, replica)` and resolving conflicts by picking the value
21//! with the larger timestamp; equal timestamps use replica ID as
22//! a deterministic tiebreaker to ensure commutativity.
23//!
24//! # Example
25//!
26//! ```rust
27//! use algebra::JoinSemilattice;
28//! use algebra::LWW;
29//!
30//! // Replica 1 writes value 100 at timestamp 1
31//! let v1 = LWW::new(100, 1, 1);
32//!
33//! // Replica 2 writes value 50 at timestamp 2
34//! let v2 = LWW::new(50, 2, 2);
35//!
36//! // Higher timestamp wins
37//! let merged = v1.join(&v2);
38//! assert_eq!(merged.value, 50);
39//! assert_eq!(merged.ts, 2);
40//! ```
41
42use std::cmp::Ordering;
43
44use serde::Deserialize;
45use serde::Serialize;
46
47use super::BoundedJoinSemilattice;
48use super::JoinSemilattice;
49
50/// A **Last-Writer-Wins register** lattice.
51///
52/// The state is a triple `(value, ts, replica)` where `ts` is a logical
53/// timestamp (e.g. Lamport clock, HLC, or monotone counter) and `replica`
54/// is a unique identifier for the writer. Ordering uses `(ts, replica)`
55/// lexicographically, yielding a total order on register versions; `join`
56/// returns the greater version and is commutative, associative, and
57/// idempotent.
58///
59/// This makes `LWW<T>` a simple register-style lattice that can be used
60/// as the payload in higher-level CRDTs or accumulators where "latest
61/// value" semantics are needed.
62///
63/// # Properties
64///
65/// - **Commutative**: `a.join(b) == b.join(a)`
66/// - **Associative**: `a.join(b).join(c) == a.join(b.join(c))`
67/// - **Idempotent**: `a.join(a) == a`
68///
69/// # Use Cases
70///
71/// - Distributed caches (last-write-wins per key)
72/// - Configuration management (latest config wins)
73/// - Watermark tracking where ranks can report decreasing values
74/// (failure recovery, reprocessing)
75///
76/// # Example
77///
78/// ```
79/// use algebra::JoinSemilattice;
80/// use algebra::LWW;
81///
82/// // Two writers (replicas 1 and 2) with different timestamps
83/// let v1 = LWW::new(100, 1, 1);
84/// let v2 = LWW::new(50, 2, 2);
85///
86/// // Higher timestamp wins, even if value is smaller
87/// assert_eq!(v1.join(&v2), LWW::new(50, 2, 2));
88/// ```
89#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
90pub struct LWW<T> {
91 /// The current value of the register.
92 pub value: T,
93 /// The logical timestamp associated with this value.
94 pub ts: u64,
95 /// The replica ID of the writer (for deterministic tie-breaking).
96 pub replica: u64,
97}
98
99impl<T: Clone + PartialEq> JoinSemilattice for LWW<T> {
100 fn join(&self, other: &Self) -> Self {
101 match self.ts.cmp(&other.ts) {
102 Ordering::Greater => self.clone(),
103 Ordering::Less => other.clone(),
104 Ordering::Equal => {
105 // Tie-break by replica: higher replica wins
106 if self.replica > other.replica {
107 self.clone()
108 } else if other.replica > self.replica {
109 other.clone()
110 } else {
111 // Same (ts, replica) should mean same write
112 // (duplicate delivery)
113 debug_assert!(
114 self.value == other.value,
115 "LWW collision: same (ts, replica) but different values"
116 );
117 self.clone()
118 }
119 }
120 }
121 }
122}
123
124impl<T: Clone + PartialEq + Default> BoundedJoinSemilattice for LWW<T> {
125 fn bottom() -> Self {
126 LWW {
127 value: T::default(),
128 ts: 0,
129 replica: 0,
130 }
131 }
132}
133
134impl<T> LWW<T> {
135 /// Create a new LWW register with the given value, timestamp, and
136 /// replica ID.
137 pub fn new(value: T, ts: u64, replica: u64) -> Self {
138 LWW { value, ts, replica }
139 }
140
141 /// Get the current value.
142 pub fn get(&self) -> &T {
143 &self.value
144 }
145
146 /// Get the timestamp.
147 pub fn timestamp(&self) -> u64 {
148 self.ts
149 }
150
151 /// Get the replica ID.
152 pub fn replica(&self) -> u64 {
153 self.replica
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160
161 #[test]
162 fn lww_join_takes_higher_timestamp() {
163 let v1 = LWW::new(100, 1, 0);
164 let v2 = LWW::new(50, 2, 0);
165
166 // Higher timestamp wins even with smaller value
167 assert_eq!(v1.join(&v2), LWW::new(50, 2, 0));
168 assert_eq!(v2.join(&v1), LWW::new(50, 2, 0));
169 }
170
171 #[test]
172 fn lww_join_on_tie_higher_replica_wins() {
173 let v1 = LWW::new(100, 1, 1);
174 let v2 = LWW::new(200, 1, 2);
175
176 // Same timestamp: higher replica wins (commutative)
177 assert_eq!(v1.join(&v2), v2);
178 assert_eq!(v2.join(&v1), v2);
179 }
180
181 #[test]
182 fn lww_is_idempotent() {
183 let v = LWW::new(42, 5, 1);
184 assert_eq!(v.join(&v), v);
185 }
186
187 #[test]
188 fn lww_is_commutative() {
189 let v1 = LWW::new(10, 1, 1);
190 let v2 = LWW::new(20, 2, 2);
191 let v3 = LWW::new(30, 3, 3);
192
193 assert_eq!(v1.join(&v2), v2.join(&v1));
194 assert_eq!(v1.join(&v2).join(&v3), v3.join(&v2).join(&v1));
195
196 // Also test commutativity with same timestamp, different replica
197 let a = LWW::new(100, 5, 1);
198 let b = LWW::new(200, 5, 2);
199 assert_eq!(a.join(&b), b.join(&a));
200 }
201
202 #[test]
203 fn lww_is_associative() {
204 let v1 = LWW::new(10, 1, 1);
205 let v2 = LWW::new(20, 2, 2);
206 let v3 = LWW::new(30, 3, 3);
207
208 assert_eq!(v1.join(&v2).join(&v3), v1.join(&v2.join(&v3)));
209
210 // Also test with same timestamp
211 let a = LWW::new(10, 5, 1);
212 let b = LWW::new(20, 5, 2);
213 let c = LWW::new(30, 5, 3);
214 assert_eq!(a.join(&b).join(&c), a.join(&b.join(&c)));
215 }
216
217 #[test]
218 fn lww_bottom_is_identity() {
219 let v = LWW::new(42, 5, 1);
220 let bottom = LWW::<i32>::bottom();
221
222 assert_eq!(bottom.join(&v), v);
223 assert_eq!(v.join(&bottom), v);
224 }
225
226 #[test]
227 fn lww_serde_roundtrip() {
228 let v = LWW::new(42, 12345, 99);
229 let encoded = bincode::serialize(&v).unwrap();
230 let decoded: LWW<i32> = bincode::deserialize(&encoded).unwrap();
231 assert_eq!(decoded, v);
232 }
233}