algebra/
crdt.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! **State-based CRDTs** built on lattice primitives.
10//!
11//! This module provides convergent replicated data types implemented in
12//! terms of [`JoinSemilattice`] / [`BoundedJoinSemilattice`]. All types
13//! here are *state-based* CRDTs: replicas exchange full lattice states
14//! and merge them with `join`, guaranteeing convergence under arbitrary
15//! message reordering and duplication.
16//!
17//! # Included Types
18//!
19//! - [`LWW<T>`]: a **Last-Writer-Wins register** lattice storing
20//!   `(value, ts, replica)` and resolving conflicts by picking the value
21//!   with the larger timestamp; equal timestamps use replica ID as
22//!   a deterministic tiebreaker to ensure commutativity.
23//!
24//! # Example
25//!
26//! ```rust
27//! use algebra::JoinSemilattice;
28//! use algebra::LWW;
29//!
30//! // Replica 1 writes value 100 at timestamp 1
31//! let v1 = LWW::new(100, 1, 1);
32//!
33//! // Replica 2 writes value 50 at timestamp 2
34//! let v2 = LWW::new(50, 2, 2);
35//!
36//! // Higher timestamp wins
37//! let merged = v1.join(&v2);
38//! assert_eq!(merged.value, 50);
39//! assert_eq!(merged.ts, 2);
40//! ```
41
42use std::cmp::Ordering;
43
44use serde::Deserialize;
45use serde::Serialize;
46
47use super::BoundedJoinSemilattice;
48use super::JoinSemilattice;
49
50/// A **Last-Writer-Wins register** lattice.
51///
52/// The state is a triple `(value, ts, replica)` where `ts` is a logical
53/// timestamp (e.g. Lamport clock, HLC, or monotone counter) and `replica`
54/// is a unique identifier for the writer. Ordering uses `(ts, replica)`
55/// lexicographically, yielding a total order on register versions; `join`
56/// returns the greater version and is commutative, associative, and
57/// idempotent.
58///
59/// This makes `LWW<T>` a simple register-style lattice that can be used
60/// as the payload in higher-level CRDTs or accumulators where "latest
61/// value" semantics are needed.
62///
63/// # Properties
64///
65/// - **Commutative**: `a.join(b) == b.join(a)`
66/// - **Associative**: `a.join(b).join(c) == a.join(b.join(c))`
67/// - **Idempotent**: `a.join(a) == a`
68///
69/// # Use Cases
70///
71/// - Distributed caches (last-write-wins per key)
72/// - Configuration management (latest config wins)
73/// - Watermark tracking where ranks can report decreasing values
74///   (failure recovery, reprocessing)
75///
76/// # Example
77///
78/// ```
79/// use algebra::JoinSemilattice;
80/// use algebra::LWW;
81///
82/// // Two writers (replicas 1 and 2) with different timestamps
83/// let v1 = LWW::new(100, 1, 1);
84/// let v2 = LWW::new(50, 2, 2);
85///
86/// // Higher timestamp wins, even if value is smaller
87/// assert_eq!(v1.join(&v2), LWW::new(50, 2, 2));
88/// ```
89#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
90pub struct LWW<T> {
91    /// The current value of the register.
92    pub value: T,
93    /// The logical timestamp associated with this value.
94    pub ts: u64,
95    /// The replica ID of the writer (for deterministic tie-breaking).
96    pub replica: u64,
97}
98
99impl<T: Clone + PartialEq> JoinSemilattice for LWW<T> {
100    fn join(&self, other: &Self) -> Self {
101        match self.ts.cmp(&other.ts) {
102            Ordering::Greater => self.clone(),
103            Ordering::Less => other.clone(),
104            Ordering::Equal => {
105                // Tie-break by replica: higher replica wins
106                if self.replica > other.replica {
107                    self.clone()
108                } else if other.replica > self.replica {
109                    other.clone()
110                } else {
111                    // Same (ts, replica) should mean same write
112                    // (duplicate delivery)
113                    debug_assert!(
114                        self.value == other.value,
115                        "LWW collision: same (ts, replica) but different values"
116                    );
117                    self.clone()
118                }
119            }
120        }
121    }
122}
123
124impl<T: Clone + PartialEq + Default> BoundedJoinSemilattice for LWW<T> {
125    fn bottom() -> Self {
126        LWW {
127            value: T::default(),
128            ts: 0,
129            replica: 0,
130        }
131    }
132}
133
134impl<T> LWW<T> {
135    /// Create a new LWW register with the given value, timestamp, and
136    /// replica ID.
137    pub fn new(value: T, ts: u64, replica: u64) -> Self {
138        LWW { value, ts, replica }
139    }
140
141    /// Get the current value.
142    pub fn get(&self) -> &T {
143        &self.value
144    }
145
146    /// Get the timestamp.
147    pub fn timestamp(&self) -> u64 {
148        self.ts
149    }
150
151    /// Get the replica ID.
152    pub fn replica(&self) -> u64 {
153        self.replica
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    #[test]
162    fn lww_join_takes_higher_timestamp() {
163        let v1 = LWW::new(100, 1, 0);
164        let v2 = LWW::new(50, 2, 0);
165
166        // Higher timestamp wins even with smaller value
167        assert_eq!(v1.join(&v2), LWW::new(50, 2, 0));
168        assert_eq!(v2.join(&v1), LWW::new(50, 2, 0));
169    }
170
171    #[test]
172    fn lww_join_on_tie_higher_replica_wins() {
173        let v1 = LWW::new(100, 1, 1);
174        let v2 = LWW::new(200, 1, 2);
175
176        // Same timestamp: higher replica wins (commutative)
177        assert_eq!(v1.join(&v2), v2);
178        assert_eq!(v2.join(&v1), v2);
179    }
180
181    #[test]
182    fn lww_is_idempotent() {
183        let v = LWW::new(42, 5, 1);
184        assert_eq!(v.join(&v), v);
185    }
186
187    #[test]
188    fn lww_is_commutative() {
189        let v1 = LWW::new(10, 1, 1);
190        let v2 = LWW::new(20, 2, 2);
191        let v3 = LWW::new(30, 3, 3);
192
193        assert_eq!(v1.join(&v2), v2.join(&v1));
194        assert_eq!(v1.join(&v2).join(&v3), v3.join(&v2).join(&v1));
195
196        // Also test commutativity with same timestamp, different replica
197        let a = LWW::new(100, 5, 1);
198        let b = LWW::new(200, 5, 2);
199        assert_eq!(a.join(&b), b.join(&a));
200    }
201
202    #[test]
203    fn lww_is_associative() {
204        let v1 = LWW::new(10, 1, 1);
205        let v2 = LWW::new(20, 2, 2);
206        let v3 = LWW::new(30, 3, 3);
207
208        assert_eq!(v1.join(&v2).join(&v3), v1.join(&v2.join(&v3)));
209
210        // Also test with same timestamp
211        let a = LWW::new(10, 5, 1);
212        let b = LWW::new(20, 5, 2);
213        let c = LWW::new(30, 5, 3);
214        assert_eq!(a.join(&b).join(&c), a.join(&b.join(&c)));
215    }
216
217    #[test]
218    fn lww_bottom_is_identity() {
219        let v = LWW::new(42, 5, 1);
220        let bottom = LWW::<i32>::bottom();
221
222        assert_eq!(bottom.join(&v), v);
223        assert_eq!(v.join(&bottom), v);
224    }
225
226    #[test]
227    fn lww_serde_roundtrip() {
228        let v = LWW::new(42, 12345, 99);
229        let encoded = bincode::serialize(&v).unwrap();
230        let decoded: LWW<i32> = bincode::deserialize(&encoded).unwrap();
231        assert_eq!(decoded, v);
232    }
233}