hyperactor_mesh/v1/
value_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::cmp::Ordering;
10use std::collections::HashMap;
11use std::collections::hash_map::Entry;
12use std::hash::Hash;
13use std::marker::PhantomData;
14use std::mem;
15use std::mem::MaybeUninit;
16use std::ops::Range;
17use std::ptr;
18use std::ptr::NonNull;
19
20use futures::Future;
21use hyperactor::Named;
22use hyperactor::accum::Accumulator;
23use hyperactor::accum::CommReducer;
24use hyperactor::accum::ReducerFactory;
25use hyperactor::accum::ReducerSpec;
26use ndslice::extent;
27use ndslice::view;
28use ndslice::view::Ranked;
29use ndslice::view::Region;
30use serde::Deserialize;
31use serde::Serialize;
32
33mod rle;
34mod value_overlay;
35pub use value_overlay::BuildError;
36pub use value_overlay::ValueOverlay;
37
38/// A mesh of values, one per rank in `region`.
39///
40/// The internal representation (`rep`) may be dense or compressed,
41/// but externally the mesh always behaves as a complete mapping from
42/// rank index → value.
43///
44/// # Invariants
45/// - Complete: every rank in `region` has exactly one value.
46/// - Order: iteration and indexing follow the region's linearization.
47#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements
48pub struct ValueMesh<T> {
49    /// The logical multidimensional domain of the mesh.
50    ///
51    /// Determines the number of ranks (`region.num_ranks()`) and the
52    /// order in which they are traversed.
53    region: Region,
54
55    /// The underlying storage representation.
56    ///
57    /// - `Rep::Dense` stores a `Vec<T>` with one value per rank.
58    /// - `Rep::Compressed` stores a run-length encoded table of
59    ///   unique values plus `(Range<usize>, u32)` pairs describing
60    ///   contiguous runs of identical values.
61    ///
62    /// The representation is an internal optimization detail; all
63    /// public APIs (e.g. `values()`, `get()`, slicing) behave as if
64    /// the mesh were dense.
65    rep: Rep<T>,
66}
67
68/// A single run-length–encoded (RLE) segment within a [`ValueMesh`].
69///
70/// Each `Run` represents a contiguous range of ranks `[start, end)`
71/// that all share the same value, referenced indirectly via a table
72/// index `id`. This allows compact storage of large regions with
73/// repeated values.
74///
75/// Runs are serialized in a stable, portable format using `u64` for
76/// range bounds (`start`, `end`) to avoid platform‐dependent `usize`
77/// encoding differences.
78#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
79struct Run {
80    /// Inclusive start of the contiguous range of ranks (0-based).
81    start: u64,
82    /// Exclusive end of the contiguous range of ranks (0-based).
83    end: u64,
84    /// Index into the value table for this run's shared value.
85    id: u32,
86}
87
88impl Run {
89    /// Creates a new `Run` covering ranks `[start, end)` that all
90    /// share the same table entry `id`.
91    ///
92    /// Converts `usize` bounds to `u64` for stable serialization.
93    fn new(start: usize, end: usize, id: u32) -> Self {
94        Self {
95            start: start as u64,
96            end: end as u64,
97            id,
98        }
99    }
100}
101
102impl TryFrom<Run> for (Range<usize>, u32) {
103    type Error = &'static str;
104
105    /// Converts a serialized [`Run`] back into its in-memory form
106    /// `(Range<usize>, u32)`.
107    ///
108    /// Performs checked conversion of the 64-bit wire fields back
109    /// into `usize` indices, returning an error if either bound
110    /// exceeds the platform's addressable range. This ensures safe
111    /// round-tripping between the serialized wire format and native
112    /// representation.
113    #[allow(clippy::result_large_err)]
114    fn try_from(r: Run) -> Result<Self, Self::Error> {
115        let start = usize::try_from(r.start).map_err(|_| "run.start too large")?;
116        let end = usize::try_from(r.end).map_err(|_| "run.end too large")?;
117        Ok((start..end, r.id))
118    }
119}
120
121/// Internal storage representation for a [`ValueMesh`].
122///
123/// This enum abstracts how the per-rank values are stored.
124/// Externally, both variants behave identically — the difference is
125/// purely in memory layout and access strategy.
126///
127/// - [`Rep::Dense`] stores one value per rank, directly.
128/// - [`Rep::Compressed`] stores a compact run-length-encoded form,
129///   reusing identical values across contiguous ranks.
130///
131/// Users of [`ValueMesh`] normally never interact with `Rep`
132/// directly; all iteration and slicing APIs present a dense logical
133/// view.
134#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements
135#[serde(tag = "rep", rename_all = "snake_case")]
136enum Rep<T> {
137    /// Fully expanded representation: one element per rank.
138    ///
139    /// The length of `values` is always equal to
140    /// `region.num_ranks()`. This form is simple and fast for
141    /// iteration and mutation but uses more memory when large runs of
142    /// repeated values are present.
143    Dense {
144        /// Flat list of values, one per rank in the region's
145        /// linearization order.
146        values: Vec<T>,
147    },
148
149    /// Run-length-encoded representation.
150    ///
151    /// Each run `(Range<usize>, id)` indicates that the ranks within
152    /// `Range` (half-open `[start, end)`) share the same value at
153    /// `table[id]`. The `table` stores each distinct value once.
154    ///
155    /// # Invariants
156    /// - Runs are non-empty and contiguous (`r.start < r.end`).
157    /// - Runs collectively cover `0..region.num_ranks()` with no gaps
158    ///   or overlaps.
159    /// - `id` indexes into `table` (`id < table.len()`).
160    Compressed {
161        /// The deduplicated set of unique values referenced by runs.
162        table: Vec<T>,
163
164        /// List of `(range, table_id)` pairs describing contiguous
165        /// runs of identical values in region order.
166        runs: Vec<Run>,
167    },
168}
169
170// At this time, Default is used primarily to satisfy the mailbox
171// Accumulator bound. It constructs an empty (zero-rank) mesh. Other
172// contexts may also use this as a generic "empty mesh" initializer
173// when a concrete region is not yet known.
174impl<T> Default for ValueMesh<T> {
175    fn default() -> Self {
176        Self::empty()
177    }
178}
179
180impl<T> ValueMesh<T> {
181    /// Returns an *empty* mesh: a 1-dimensional region of length 0.
182    ///
183    /// This differs from a *dimensionless* (0-D) region, which
184    /// represents a single scalar element. A zero-length 1-D region
185    /// has **no elements at all**, making it the natural `Default`
186    /// placeholder for accumulator state initialization.
187    pub fn empty() -> Self {
188        // zero-rank region; no constraints on T
189        let region = extent!(r = 0).into();
190        Self::new_unchecked(region, Vec::<T>::new())
191    }
192
193    /// Creates a new `ValueMesh` for `region` with exactly one value
194    /// per rank.
195    ///
196    /// # Invariants
197    /// This constructor validates that the number of provided values
198    /// (`ranks.len()`) matches the region's cardinality
199    /// (`region.num_ranks()`). A value mesh must be complete: every
200    /// rank in `region` has a corresponding `T`.
201    ///
202    /// # Errors
203    /// Returns [`Error::InvalidRankCardinality`] if `ranks.len() !=
204    /// region.num_ranks()`.
205    /// ```
206    #[allow(clippy::result_large_err)]
207    pub(crate) fn new(region: Region, ranks: Vec<T>) -> crate::v1::Result<Self> {
208        let (actual, expected) = (ranks.len(), region.num_ranks());
209        if actual != expected {
210            return Err(crate::v1::Error::InvalidRankCardinality { expected, actual });
211        }
212        Ok(Self {
213            region,
214            rep: Rep::Dense { values: ranks },
215        })
216    }
217
218    /// Constructs a `ValueMesh` without checking cardinality. Caller
219    /// must ensure `ranks.len() == region.num_ranks()`.
220    #[inline]
221    pub(crate) fn new_unchecked(region: Region, ranks: Vec<T>) -> Self {
222        debug_assert_eq!(region.num_ranks(), ranks.len());
223        Self {
224            region,
225            rep: Rep::Dense { values: ranks },
226        }
227    }
228}
229
230impl<T: Clone> ValueMesh<T> {
231    /// Builds a `ValueMesh` that assigns the single value `s` to
232    /// every rank in `region`, without materializing a dense
233    /// `Vec<T>`. The result is stored in compressed (RLE) form as a
234    /// single run `[0..N)`.
235    ///
236    /// If `region.num_ranks() == 0`, the mesh contains no runs (and
237    /// an empty table), regardless of `s`.
238    pub fn from_single(region: Region, s: T) -> Self {
239        let n = region.num_ranks();
240        if n == 0 {
241            return Self {
242                region,
243                rep: Rep::Compressed {
244                    table: Vec::new(),
245                    runs: Vec::new(),
246                },
247            };
248        }
249
250        let table = vec![s];
251        let runs = vec![Run::new(0, n, 0)];
252        Self {
253            region,
254            rep: Rep::Compressed { table, runs },
255        }
256    }
257}
258
259impl<T: Clone + Default> ValueMesh<T> {
260    /// Builds a [`ValueMesh`] covering the region, filled with
261    /// `T::default()`.
262    ///
263    /// Equivalent to [`ValueMesh::from_single(region,
264    /// T::default())`].
265    pub fn from_default(region: Region) -> Self {
266        ValueMesh::<T>::from_single(region, T::default())
267    }
268}
269
270impl<T: Eq + Hash> ValueMesh<T> {
271    /// Builds a compressed mesh from a default value and a set of
272    /// disjoint ranges that override the default.
273    ///
274    /// - `ranges` may be in any order; they must be non-empty,
275    ///   in-bounds, and non-overlapping.
276    /// - Unspecified ranks are filled with `default`.
277    /// - Result is stored in RLE form; no dense `Vec<T>` is
278    ///   materialized.
279    #[allow(clippy::result_large_err)]
280    pub fn from_ranges_with_default(
281        region: Region,
282        default: T,
283        mut ranges: Vec<(Range<usize>, T)>,
284    ) -> crate::v1::Result<Self> {
285        let n = region.num_ranks();
286
287        if n == 0 {
288            return Ok(Self {
289                region,
290                rep: Rep::Compressed {
291                    table: Vec::new(),
292                    runs: Vec::new(),
293                },
294            });
295        }
296
297        // Validate: non-empty, in-bounds; then sort.
298        for (r, _) in &ranges {
299            if r.is_empty() {
300                return Err(crate::v1::Error::InvalidRankCardinality {
301                    expected: n,
302                    actual: 0,
303                }); // TODO: this surfaces the error but its not a great fit
304            }
305            if r.end > n {
306                return Err(crate::v1::Error::InvalidRankCardinality {
307                    expected: n,
308                    actual: r.end,
309                });
310            }
311        }
312        ranges.sort_by_key(|(r, _)| (r.start, r.end));
313
314        // Validate: non-overlapping.
315        for w in ranges.windows(2) {
316            let (a, _) = &w[0];
317            let (b, _) = &w[1];
318            if a.end > b.start {
319                // Overlap
320                return Err(crate::v1::Error::InvalidRankCardinality {
321                    expected: n,
322                    actual: b.start, // TODO: this surfaces the error but is a bad fit
323                });
324            }
325        }
326
327        // Internal index: value -> table id (assigned once).
328        let mut index: HashMap<T, u32> = HashMap::with_capacity(1 + ranges.len());
329        let mut next_id: u32 = 0;
330
331        // Helper: assign or reuse an id by value, taking ownership of
332        // v.
333        let id_of = |v: T, index: &mut HashMap<T, u32>, next_id: &mut u32| -> u32 {
334            match index.entry(v) {
335                Entry::Occupied(o) => *o.get(),
336                Entry::Vacant(vac) => {
337                    let id = *next_id;
338                    *next_id += 1;
339                    vac.insert(id);
340                    id
341                }
342            }
343        };
344
345        let default_id = id_of(default, &mut index, &mut next_id);
346
347        let mut runs: Vec<Run> = Vec::with_capacity(1 + 2 * ranges.len());
348        let mut cursor = 0usize;
349
350        for (r, v) in ranges.into_iter() {
351            // Fill default gap if any.
352            if cursor < r.start {
353                runs.push(Run::new(cursor, r.start, default_id));
354            }
355            // Override block.
356            let id = id_of(v, &mut index, &mut next_id);
357            runs.push(Run::new(r.start, r.end, id));
358            cursor = r.end;
359        }
360
361        // Trailing default tail.
362        if cursor < n {
363            runs.push(Run::new(cursor, n, default_id));
364        }
365
366        // Materialize table in id order without cloning: move keys
367        // out of the map.
368        let mut table_slots: Vec<Option<T>> = Vec::new();
369        table_slots.resize_with(next_id as usize, || None);
370
371        for (t, id) in index.into_iter() {
372            table_slots[id as usize] = Some(t);
373        }
374
375        let table: Vec<T> = table_slots
376            .into_iter()
377            .map(|o| o.expect("every id must be assigned"))
378            .collect();
379
380        Ok(Self {
381            region,
382            rep: Rep::Compressed { table, runs },
383        })
384    }
385
386    /// Builds a [`ValueMesh`] from a fully materialized dense vector
387    /// of per-rank values, then compresses it into run-length–encoded
388    /// form if possible.
389    ///
390    /// This constructor is intended for callers that already have one
391    /// value per rank (e.g. computed or received data) but wish to
392    /// store it efficiently.
393    ///
394    /// # Parameters
395    /// - `region`: The logical region describing the mesh's shape and
396    ///   rank order.
397    /// - `values`: A dense vector of values, one per rank in
398    ///   `region`.
399    ///
400    /// # Returns
401    /// A [`ValueMesh`] whose internal representation is `Compressed`
402    /// if any adjacent elements are equal, or `Dense` if no
403    /// compression was possible.
404    ///
405    /// # Errors
406    /// Returns an error if the number of provided `values` does not
407    /// match the number of ranks in `region`.
408    ///
409    /// # Examples
410    /// ```ignore
411    /// let region: Region = extent!(n = 5).into();
412    /// let mesh = ValueMesh::from_dense(region, vec![1, 1, 2, 2, 3]).unwrap();
413    /// // Internally compressed to three runs: [1, 1], [2, 2], [3]
414    /// ```
415    #[allow(clippy::result_large_err)]
416    pub fn from_dense(region: Region, values: Vec<T>) -> crate::v1::Result<Self> {
417        let mut vm = Self::new(region, values)?;
418        vm.compress_adjacent_in_place();
419        Ok(vm)
420    }
421}
422
423impl<F: Future> ValueMesh<F> {
424    /// Await all futures in the mesh, yielding a `ValueMesh` of their
425    /// outputs.
426    pub async fn join(self) -> ValueMesh<F::Output> {
427        let ValueMesh { region, rep } = self;
428
429        match rep {
430            Rep::Dense { values } => {
431                let results = futures::future::join_all(values).await;
432                ValueMesh::new_unchecked(region, results)
433            }
434            Rep::Compressed { .. } => {
435                unreachable!("join() not implemented for compressed meshes")
436            }
437        }
438    }
439}
440
441impl<T, E> ValueMesh<Result<T, E>> {
442    /// Transposes a `ValueMesh<Result<T, E>>` into a
443    /// `Result<ValueMesh<T>, E>`.
444    pub fn transpose(self) -> Result<ValueMesh<T>, E> {
445        let ValueMesh { region, rep } = self;
446
447        match rep {
448            Rep::Dense { values } => {
449                let values = values.into_iter().collect::<Result<Vec<T>, E>>()?;
450                Ok(ValueMesh::new_unchecked(region, values))
451            }
452            Rep::Compressed { table, runs } => {
453                let table: Vec<T> = table.into_iter().collect::<Result<Vec<T>, E>>()?;
454                Ok(ValueMesh {
455                    region,
456                    rep: Rep::Compressed { table, runs },
457                })
458            }
459        }
460    }
461}
462
463impl<T: 'static> view::Ranked for ValueMesh<T> {
464    type Item = T;
465
466    /// Returns the region that defines this mesh's shape and rank
467    /// order.
468    fn region(&self) -> &Region {
469        &self.region
470    }
471
472    /// Looks up the value at the given linearized rank.
473    ///
474    /// Works transparently for both dense and compressed
475    /// representations:
476    /// - In the dense case, it simply indexes into the `values`
477    ///   vector.
478    /// - In the compressed case, it performs a binary search over run
479    ///   boundaries to find which run contains the given rank, then
480    ///   returns the corresponding entry from `table`.
481    ///
482    /// Returns `None` if the rank is out of bounds.
483    fn get(&self, rank: usize) -> Option<&Self::Item> {
484        if rank >= self.region.num_ranks() {
485            return None;
486        }
487
488        match &self.rep {
489            Rep::Dense { values } => values.get(rank),
490
491            Rep::Compressed { table, runs } => {
492                let rank = rank as u64;
493
494                // Binary search over runs: find the one whose range
495                // contains `rank`.
496                let idx = runs
497                    .binary_search_by(|run| {
498                        if run.end <= rank {
499                            Ordering::Less
500                        } else if run.start > rank {
501                            Ordering::Greater
502                        } else {
503                            Ordering::Equal
504                        }
505                    })
506                    .ok()?;
507
508                // Map the run's table ID to its actual value.
509                let id = runs[idx].id as usize;
510                table.get(id)
511            }
512        }
513    }
514}
515
516impl<T: Clone + 'static> view::RankedSliceable for ValueMesh<T> {
517    fn sliced(&self, region: Region) -> Self {
518        debug_assert!(region.is_subset(self.region()), "sliced: not a subset");
519        let ranks: Vec<T> = self
520            .region()
521            .remap(&region)
522            .unwrap()
523            .map(|index| self.get(index).unwrap().clone())
524            .collect();
525        debug_assert_eq!(
526            region.num_ranks(),
527            ranks.len(),
528            "sliced: cardinality mismatch"
529        );
530        Self::new_unchecked(region, ranks)
531    }
532}
533
534impl<T> view::BuildFromRegion<T> for ValueMesh<T> {
535    type Error = crate::v1::Error;
536
537    fn build_dense(region: Region, values: Vec<T>) -> Result<Self, Self::Error> {
538        Self::new(region, values)
539    }
540
541    fn build_dense_unchecked(region: Region, values: Vec<T>) -> Self {
542        Self::new_unchecked(region, values)
543    }
544}
545
546impl<T> view::BuildFromRegionIndexed<T> for ValueMesh<T> {
547    type Error = crate::v1::Error;
548
549    fn build_indexed(
550        region: Region,
551        pairs: impl IntoIterator<Item = (usize, T)>,
552    ) -> Result<Self, Self::Error> {
553        let n = region.num_ranks();
554
555        // Allocate uninitialized buffer for T.
556        // Note: Vec<MaybeUninit<T>>'s Drop will only free the
557        // allocation; it never runs T's destructor. We must
558        // explicitly drop any initialized elements (DropGuard) or
559        // convert into Vec<T>.
560        let mut buf: Vec<MaybeUninit<T>> = Vec::with_capacity(n);
561        // SAFETY: set `len = n` to treat the buffer as n uninit slots
562        // of `MaybeUninit<T>`. We never read before `ptr::write`,
563        // drop only slots marked initialized (bitset), and convert to
564        // `Vec<T>` only once all 0..n are initialized (guard enforces
565        // this).
566        unsafe {
567            buf.set_len(n);
568        }
569
570        // Compact bitset for occupancy.
571        let words = n.div_ceil(64);
572        let mut bits = vec![0u64; words];
573        let mut filled = 0usize;
574
575        // Drop guard: cleans up initialized elements on early exit.
576        // Stores raw, non-borrowed pointers (`NonNull`), so we don't
577        // hold Rust references for the whole scope. This allows
578        // mutating `buf`/`bits` inside the loop while still letting
579        // the guard access them if dropped early.
580        struct DropGuard<T> {
581            buf: NonNull<MaybeUninit<T>>,
582            bits: NonNull<u64>,
583            n_elems: usize,
584            n_words: usize,
585            disarm: bool,
586        }
587
588        impl<T> DropGuard<T> {
589            /// # Safety
590            /// - `buf` points to `buf.len()` contiguous
591            ///   `MaybeUninit<T>` and outlives the guard.
592            /// - `bits` points to `bits.len()` contiguous `u64` words
593            ///   and outlives the guard.
594            /// - For every set bit `i` in `bits`, `buf[i]` has been
595            ///   initialized with a valid `T` (and on duplicates, the
596            ///   previous `T` at `i` was dropped before overwrite).
597            /// - While the guard is alive, caller may mutate
598            ///   `buf`/`bits` (the guard holds only raw pointers; it
599            ///   does not create Rust borrows).
600            /// - On drop (when not disarmed), the guard will read
601            ///   `bits`, mask tail bits in the final word, and
602            ///   `drop_in_place` each initialized `buf[i]`—never
603            ///   accessing beyond `buf.len()` / `bits.len()`.
604            /// - If a slice is empty, the stored pointer may be
605            ///   `dangling()`; it is never dereferenced when the
606            ///   corresponding length is zero.
607            unsafe fn new(buf: &mut [MaybeUninit<T>], bits: &mut [u64]) -> Self {
608                let n_elems = buf.len();
609                let n_words = bits.len();
610                // Invariant typically: n_words == (n_elems + 63) / 64
611                // but we don't *require* it; tail is masked in Drop.
612                Self {
613                    buf: NonNull::new(buf.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
614                    bits: NonNull::new(bits.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
615                    n_elems,
616                    n_words,
617                    disarm: false,
618                }
619            }
620
621            #[inline]
622            fn disarm(&mut self) {
623                self.disarm = true;
624            }
625        }
626
627        impl<T> Drop for DropGuard<T> {
628            fn drop(&mut self) {
629                if self.disarm {
630                    return;
631                }
632
633                // SAFETY:
634                // - `self.buf` points to `n_elems` contiguous
635                //   `MaybeUninit<T>` slots (or may be dangling if
636                //   `n_elems == 0`); `self.bits` points to `n_words`
637                //   contiguous `u64` words (or may be dangling if
638                //   `n_words == 0`).
639                // - Loop bounds ensure `w < n_words` when reading
640                //   `*bits_base.add(w)`.
641                // - For the final word we mask unused tail bits so
642                //   any computed index `i = w * 64 + tz` always
643                //   satisfies `i < n_elems` before we dereference
644                //   `buf_base.add(i)`.
645                // - Only slots whose bits are set are dropped, so no
646                //   double-drops.
647                // - No aliasing with active Rust borrows: the guard
648                //   holds raw pointers and runs in `Drop` after the
649                //   fill loop.
650                unsafe {
651                    let buf_base = self.buf.as_ptr();
652                    let bits_base = self.bits.as_ptr();
653
654                    for w in 0..self.n_words {
655                        // Load word.
656                        let mut word = *bits_base.add(w);
657
658                        // Mask off bits beyond `n_elems` in the final
659                        // word (if any).
660                        if w == self.n_words.saturating_sub(1) {
661                            let used_bits = self.n_elems.saturating_sub(w * 64);
662                            if used_bits < 64 {
663                                let mask = if used_bits == 0 {
664                                    0
665                                } else {
666                                    (1u64 << used_bits) - 1
667                                };
668                                word &= mask;
669                            }
670                        }
671
672                        // Fast scan set bits.
673                        while word != 0 {
674                            let tz = word.trailing_zeros() as usize;
675                            let i = w * 64 + tz;
676                            debug_assert!(i < self.n_elems);
677
678                            let slot = buf_base.add(i);
679                            // Drop the initialized element.
680                            ptr::drop_in_place((*slot).as_mut_ptr());
681
682                            // clear the bit we just handled
683                            word &= word - 1;
684                        }
685                    }
686                }
687            }
688        }
689
690        // SAFETY:
691        // - `buf` and `bits` are freshly allocated Vecs with
692        //   capacity/len set to cover exactly `n_elems` and `n_words`,
693        //   so their `.as_mut_ptr()` is valid for that many elements.
694        // - Both slices live at least as long as the guard, and are
695        //   not moved until after the guard is disarmed.
696        // - No aliasing occurs: the guard holds only raw pointers and
697        //   the fill loop mutates through those same allocations.
698        let mut guard = unsafe { DropGuard::new(&mut buf, &mut bits) };
699
700        for (rank, value) in pairs {
701            // Single bounds check up front.
702            if rank >= guard.n_elems {
703                return Err(crate::v1::Error::InvalidRankCardinality {
704                    expected: guard.n_elems,
705                    actual: rank + 1,
706                });
707            }
708
709            // Compute word index and bit mask once.
710            let w = rank / 64;
711            let b = rank % 64;
712            let mask = 1u64 << b;
713
714            // SAFETY:
715            // - `rank < guard.n_elems` was checked above, so
716            //   `buf_slot = buf.add(rank)` is within the
717            //   `Vec<MaybeUninit<T>>` allocation.
718            // - `w = rank / 64` and `bits.len() == (n + 63) / 64`
719            //   ensure `bits_ptr = bits.add(w)` is in-bounds.
720            // - If `(word & mask) != 0`, then this slot was
721            //   previously initialized;
722            //   `drop_in_place((*buf_slot).as_mut_ptr())` is valid and
723            //   leaves the slot uninitialized.
724            // - If the bit was clear, we set it and count `filled +=
725            //   1`.
726            // - `(*buf_slot).write(value)` is valid in both cases:
727            //   either writing into an uninitialized slot or
728            //   immediately after dropping the prior `T`.
729            // - No aliasing with Rust references: the guard holds raw
730            //   pointers and we have exclusive ownership of
731            //   `buf`/`bits` within this function.
732            unsafe {
733                // Pointers from the guard (no long-lived & borrows).
734                let bits_ptr = guard.bits.as_ptr().add(w);
735                let buf_slot = guard.buf.as_ptr().add(rank);
736
737                // Read the current word.
738                let word = *bits_ptr;
739
740                if (word & mask) != 0 {
741                    // Duplicate: drop old value before overwriting.
742                    core::ptr::drop_in_place((*buf_slot).as_mut_ptr());
743                    // (Bit already set; no need to set again; don't
744                    // bump `filled`.)
745                } else {
746                    // First time we see this rank.
747                    *bits_ptr = word | mask;
748                    filled += 1;
749                }
750
751                // Write new value into the slot.
752                (*buf_slot).write(value);
753            }
754        }
755
756        if filled != n {
757            // Missing ranks: actual = number of distinct ranks seen.
758            return Err(crate::v1::Error::InvalidRankCardinality {
759                expected: n,
760                actual: filled,
761            });
762        }
763
764        // Success: prevent cleanup.
765        guard.disarm();
766
767        // SAFETY: all n slots are initialized
768        let ranks = unsafe {
769            let ptr = buf.as_mut_ptr() as *mut T;
770            let len = buf.len();
771            let cap = buf.capacity();
772            // Prevent `buf` (Vec<MaybeUninit<T>>) from freeing the
773            // allocation. Ownership of the buffer is about to be
774            // transferred to `Vec<T>` via `from_raw_parts`.
775            // Forgetting avoids a double free.
776            mem::forget(buf);
777            Vec::from_raw_parts(ptr, len, cap)
778        };
779
780        Ok(Self::new_unchecked(region, ranks))
781    }
782}
783
784impl<T: PartialEq> ValueMesh<T> {
785    /// Compresses the mesh in place using run-length encoding (RLE).
786    ///
787    /// This method scans the mesh's dense values, coalescing adjacent
788    /// runs of identical elements into a compact [`Rep::Compressed`]
789    /// representation. It replaces the internal storage (`rep`) with
790    /// the compressed form.
791    ///
792    /// # Behavior
793    /// - If the mesh is already compressed, this is a **no-op**.
794    /// - If the mesh is dense, it consumes the current `Vec<T>` and
795    ///   rebuilds the representation as a run table plus value table.
796    /// - Only *adjacent* equal values are merged; non-contiguous
797    ///   duplicates remain distinct.
798    ///
799    /// # Requirements
800    /// - `T` must implement [`PartialEq`] (to detect equal values).
801    ///
802    /// This operation is lossless: expanding the compressed mesh back
803    /// into a dense vector yields the same sequence of values.
804    pub fn compress_adjacent_in_place(&mut self) {
805        self.compress_adjacent_in_place_by(|a, b| a == b)
806    }
807}
808
809impl<T: Clone + PartialEq> ValueMesh<T> {
810    /// Materializes this mesh into a vector of `(Range<usize>, T)`
811    /// runs.
812    ///
813    /// For a dense representation, this walks the value vector and
814    /// groups adjacent equal values into contiguous runs. The result
815    /// is equivalent to what would be stored in a compressed
816    /// representation, but the mesh itself is not mutated or
817    /// re-encoded. This is purely a read-only view.
818    ///
819    /// For a compressed representation, the stored runs are simply
820    /// cloned.
821    ///
822    /// This method is intended for inspection, testing, and
823    /// diff/merge operations that need a uniform view of value runs
824    /// without changing the underlying representation.
825    fn materialized_runs(&self) -> Vec<(Range<usize>, T)> {
826        match &self.rep {
827            Rep::Dense { values } => {
828                // Coalesce adjacent equals into runs.
829                let mut out = Vec::new();
830                if values.is_empty() {
831                    return out;
832                }
833                let mut start = 0usize;
834                for i in 1..values.len() {
835                    if values[i] != values[i - 1] {
836                        out.push((start..i, values[i - 1].clone()));
837                        start = i;
838                    }
839                }
840                out.push((start..values.len(), values.last().unwrap().clone()));
841                out
842            }
843            Rep::Compressed { table, runs } => runs
844                .iter()
845                .map(|r| {
846                    let id = r.id as usize;
847                    ((r.start as usize..r.end as usize), table[id].clone())
848                })
849                .collect(),
850        }
851    }
852}
853
854impl<T: Clone + Eq> ValueMesh<T> {
855    /// Merge a sparse overlay into this mesh.
856    ///
857    /// Overlay segments are applied with **last-writer-wins**
858    /// precedence on overlap (identical to `RankedValues::merge_from`
859    /// behavior). The result is stored compressed.
860    pub fn merge_from_overlay(&mut self, overlay: ValueOverlay<T>) -> Result<(), BuildError> {
861        let n = self.region.num_ranks();
862
863        // Bounds validation (structure already validated by
864        // ValueOverlay).
865        for (r, _) in overlay.runs() {
866            if r.end > n {
867                return Err(BuildError::OutOfBounds {
868                    range: r.clone(),
869                    region_len: n,
870                });
871            }
872        }
873
874        // Left: current mesh as normalized value-bearing runs.
875        let left = self.materialized_runs();
876        // Right: overlay runs (already sorted, non-overlapping,
877        // coalesced).
878        let right: Vec<(std::ops::Range<usize>, T)> = overlay.runs().cloned().collect();
879
880        // Merge with overlay precedence, reusing the same splitting
881        // strategy as RankedValues::merge_from.
882        let merged = rle::merge_value_runs(left, right);
883
884        // Re-encode to compressed representation:
885        let (table, raw_runs) = rle::rle_from_value_runs(merged);
886        let runs = raw_runs
887            .into_iter()
888            .map(|(r, id)| Run::new(r.start, r.end, id))
889            .collect();
890        self.rep = Rep::Compressed { table, runs };
891
892        Ok(())
893    }
894}
895
896impl<T> ValueMesh<T> {
897    /// Compresses the mesh in place using a custom equivalence
898    /// predicate.
899    ///
900    /// This is a generalized form of [`compress_adjacent_in_place`]
901    /// that merges adjacent values according to an arbitrary
902    /// predicate `same(a, b)`, rather than relying on `PartialEq`.
903    ///
904    /// # Behavior
905    /// - If the mesh is already compressed, this is a **no-op**.
906    /// - Otherwise, consumes the dense `Vec<T>` and replaces it with
907    ///   a run-length encoded (`Rep::Compressed`) representation,
908    ///   where consecutive elements satisfying `same(a, b)` are
909    ///   coalesced into a single run.
910    ///
911    /// # Requirements
912    /// - The predicate must be reflexive and symmetric for
913    ///   correctness.
914    ///
915    /// This operation is lossless: expanding the compressed mesh
916    /// reproduces the original sequence exactly under the same
917    /// equivalence.
918    pub fn compress_adjacent_in_place_by<F>(&mut self, same: F)
919    where
920        F: FnMut(&T, &T) -> bool,
921    {
922        let values = match &mut self.rep {
923            Rep::Dense { values } => std::mem::take(values),
924            Rep::Compressed { .. } => return,
925        };
926        let (table, raw_runs) = rle::rle_from_dense(values, same);
927        let runs = raw_runs
928            .into_iter()
929            .map(|(r, id)| Run::new(r.start, r.end, id))
930            .collect();
931        self.rep = Rep::Compressed { table, runs };
932    }
933}
934
935/// Accumulates sparse overlay updates into an authoritative mesh.
936///
937/// Lifecycle:
938/// - Mailbox initializes `State` via `Default` (empty mesh).
939/// - On the first update, the accumulator clones `self` (the template
940///   passed to `open_accum_port_opts`) into `state`. Callers pass a
941///   template such as `StatusMesh::from_single(region, NotExist)`.
942/// - Each overlay update is merged with right-wins semantics via
943///   `merge_from_overlay`, and the accumulator emits a *full*
944///   ValueMesh.
945///
946/// The accumulator's state is a [`ValueMesh<T>`] and its updates are
947/// [`ValueOverlay<T>`] instances. On each update, the overlay’s
948/// normalized runs are merged into the mesh using
949/// [`ValueMesh::merge_from_overlay`] with right-wins semantics.
950///
951/// This enables incremental reduction of sparse updates across
952/// distributed reducers without materializing dense data.
953impl<T> Accumulator for ValueMesh<T>
954where
955    T: Eq + Clone + Named,
956{
957    type State = Self;
958    type Update = ValueOverlay<T>;
959
960    fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
961        // Mailbox starts with A::State::default() (empty). On the
962        // first update, re-initialize to our template (self), which
963        // the caller constructed as "full NotExist over the target
964        // region".
965        if state.region().num_ranks() == 0 {
966            *state = self.clone();
967        }
968
969        // Apply sparse delta into the authoritative mesh
970        // (right-wins).
971        state.merge_from_overlay(update)?;
972        Ok(())
973    }
974
975    fn reducer_spec(&self) -> Option<ReducerSpec> {
976        Some(ReducerSpec {
977            typehash: <ValueOverlayReducer<T> as Named>::typehash(),
978            builder_params: None,
979        })
980    }
981}
982
983/// Marker reducer type for [`ValueOverlay<T>`].
984///
985/// This reducer carries no state; it exists only to bind a concrete
986/// type parameter `T` to the [`CommReducer`] implementation below.
987/// Reduction is purely functional and uses right-wins merge semantics
988/// defined in [`merge_value_runs`].
989#[derive(Named)]
990struct ValueOverlayReducer<T>(std::marker::PhantomData<T>);
991
992/// Reducer for sparse overlay updates.
993///
994/// Combines two [`ValueOverlay<T>`] updates using right-wins
995/// semantics: overlapping runs from `right` overwrite those in
996/// `left`. The merged runs are then normalized and validated via
997/// [`ValueOverlay::try_from_runs`].
998///
999/// Used by the corresponding [`Accumulator`] to perform distributed
1000/// reduction of incremental mesh updates.
1001impl<T> CommReducer for ValueOverlayReducer<T>
1002where
1003    T: Eq + Clone + Named,
1004{
1005    type Update = ValueOverlay<T>;
1006
1007    // Last-writer-wins merge of two sparse overlays.
1008    fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
1009        // 1) Merge runs with right precedence.
1010        let merged = crate::v1::value_mesh::rle::merge_value_runs(
1011            left.runs().cloned().collect(),
1012            right.runs().cloned().collect(),
1013        );
1014        // 2) Re-normalize to an overlay (validates, coalesces).
1015        Ok(ValueOverlay::try_from_runs(merged)?)
1016    }
1017}
1018
1019// register for concrete types:
1020
1021hyperactor::submit! {
1022    ReducerFactory {
1023        typehash_f: <ValueOverlayReducer<crate::resource::Status> as Named>::typehash,
1024        builder_f: |_| Ok(Box::new(ValueOverlayReducer::<crate::resource::Status>(PhantomData))),
1025    }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030    use std::convert::Infallible;
1031    use std::future::Future;
1032    use std::pin::Pin;
1033    use std::task::Context;
1034    use std::task::Poll;
1035    use std::task::RawWaker;
1036    use std::task::RawWakerVTable;
1037    use std::task::Waker;
1038
1039    use futures::executor::block_on;
1040    use futures::future;
1041    use ndslice::extent;
1042    use ndslice::strategy::gen_region;
1043    use ndslice::view::CollectExactMeshExt;
1044    use ndslice::view::CollectIndexedMeshExt;
1045    use ndslice::view::CollectMeshExt;
1046    use ndslice::view::MapIntoExt;
1047    use ndslice::view::Ranked;
1048    use ndslice::view::RankedSliceable;
1049    use ndslice::view::ViewExt;
1050    use proptest::prelude::*;
1051    use proptest::strategy::ValueTree;
1052    use serde_json;
1053
1054    use super::*;
1055
1056    #[test]
1057    fn value_mesh_new_ok() {
1058        let region: Region = extent!(replica = 2, gpu = 3).into();
1059        let mesh = ValueMesh::new(region.clone(), (0..6).collect()).expect("new should succeed");
1060        assert_eq!(mesh.region().num_ranks(), 6);
1061        assert_eq!(mesh.values().count(), 6);
1062        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1063    }
1064
1065    #[test]
1066    fn value_mesh_new_len_mismatch_is_error() {
1067        let region: Region = extent!(replica = 2, gpu = 3).into();
1068        let err = ValueMesh::new(region, vec![0_i32; 5]).unwrap_err();
1069        match err {
1070            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1071                assert_eq!(expected, 6);
1072                assert_eq!(actual, 5);
1073            }
1074            other => panic!("unexpected error: {other:?}"),
1075        }
1076    }
1077
1078    #[test]
1079    fn value_mesh_transpose_ok_and_err() {
1080        let region: Region = extent!(x = 2).into();
1081
1082        // ok case
1083        let ok_mesh = ValueMesh::new(region.clone(), vec![Ok::<_, Infallible>(1), Ok(2)]).unwrap();
1084        let ok = ok_mesh.transpose().unwrap();
1085        assert_eq!(ok.values().collect::<Vec<_>>(), vec![1, 2]);
1086
1087        // err case: propagate user E
1088        #[derive(Debug, PartialEq)]
1089        enum E {
1090            Boom,
1091        }
1092        let err_mesh = ValueMesh::new(region, vec![Ok(1), Err(E::Boom)]).unwrap();
1093        let err = err_mesh.transpose().unwrap_err();
1094        assert_eq!(err, E::Boom);
1095    }
1096
1097    #[test]
1098    fn value_mesh_join_preserves_region_and_values() {
1099        let region: Region = extent!(x = 2, y = 2).into();
1100        let futs = vec![
1101            future::ready(10),
1102            future::ready(11),
1103            future::ready(12),
1104            future::ready(13),
1105        ];
1106        let mesh = ValueMesh::new(region.clone(), futs).unwrap();
1107
1108        let joined = block_on(mesh.join());
1109        assert_eq!(joined.region().num_ranks(), 4);
1110        assert_eq!(joined.values().collect::<Vec<_>>(), vec![10, 11, 12, 13]);
1111    }
1112
1113    #[test]
1114    fn collect_mesh_ok() {
1115        let region: Region = extent!(x = 2, y = 3).into();
1116        let mesh = (0..6)
1117            .collect_mesh::<ValueMesh<_>>(region.clone())
1118            .expect("collect_mesh should succeed");
1119
1120        assert_eq!(mesh.region().num_ranks(), 6);
1121        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1122    }
1123
1124    #[test]
1125    fn collect_mesh_len_too_short_is_error() {
1126        let region: Region = extent!(x = 2, y = 3).into();
1127        let err = (0..5).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
1128
1129        match err {
1130            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1131                assert_eq!(expected, 6);
1132                assert_eq!(actual, 5);
1133            }
1134            other => panic!("unexpected error: {other:?}"),
1135        }
1136    }
1137
1138    #[test]
1139    fn collect_mesh_len_too_long_is_error() {
1140        let region: Region = extent!(x = 2, y = 3).into();
1141        let err = (0..7).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
1142        match err {
1143            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1144                assert_eq!(expected, 6);
1145                assert_eq!(actual, 7);
1146            }
1147            other => panic!("unexpected error: {other:?}"),
1148        }
1149    }
1150
1151    #[test]
1152    fn collect_mesh_from_map_pipeline() {
1153        let region: Region = extent!(x = 2, y = 2).into();
1154        let mesh = (0..4)
1155            .map(|i| i * 10)
1156            .collect_mesh::<ValueMesh<_>>(region.clone())
1157            .unwrap();
1158
1159        assert_eq!(mesh.region().num_ranks(), 4);
1160        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
1161    }
1162
1163    #[test]
1164    fn collect_exact_mesh_ok() {
1165        let region: Region = extent!(x = 2, y = 3).into();
1166        let mesh = (0..6)
1167            .collect_exact_mesh::<ValueMesh<_>>(region.clone())
1168            .expect("collect_exact_mesh should succeed");
1169
1170        assert_eq!(mesh.region().num_ranks(), 6);
1171        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1172    }
1173
1174    #[test]
1175    fn collect_exact_mesh_len_too_short_is_error() {
1176        let region: Region = extent!(x = 2, y = 3).into();
1177        let err = (0..5)
1178            .collect_exact_mesh::<ValueMesh<_>>(region)
1179            .unwrap_err();
1180
1181        match err {
1182            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1183                assert_eq!(expected, 6);
1184                assert_eq!(actual, 5);
1185            }
1186            other => panic!("unexpected error: {other:?}"),
1187        }
1188    }
1189
1190    #[test]
1191    fn collect_exact_mesh_len_too_long_is_error() {
1192        let region: Region = extent!(x = 2, y = 3).into();
1193        let err = (0..7)
1194            .collect_exact_mesh::<ValueMesh<_>>(region)
1195            .unwrap_err();
1196
1197        match err {
1198            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1199                assert_eq!(expected, 6);
1200                assert_eq!(actual, 7);
1201            }
1202            other => panic!("unexpected error: {other:?}"),
1203        }
1204    }
1205
1206    #[test]
1207    fn collect_exact_mesh_from_map_pipeline() {
1208        let region: Region = extent!(x = 2, y = 2).into();
1209        let mesh = (0..4)
1210            .map(|i| i * 10)
1211            .collect_exact_mesh::<ValueMesh<_>>(region.clone())
1212            .unwrap();
1213
1214        assert_eq!(mesh.region().num_ranks(), 4);
1215        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
1216    }
1217
1218    #[test]
1219    fn collect_indexed_ok_shuffled() {
1220        let region: Region = extent!(x = 2, y = 3).into();
1221        // (rank, value) in shuffled order; values = rank * 10
1222        let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)];
1223        let mesh = pairs
1224            .into_iter()
1225            .collect_indexed::<ValueMesh<_>>(region.clone())
1226            .unwrap();
1227
1228        assert_eq!(mesh.region().num_ranks(), 6);
1229        assert_eq!(
1230            mesh.values().collect::<Vec<_>>(),
1231            vec![0, 10, 20, 30, 40, 50]
1232        );
1233    }
1234
1235    #[test]
1236    fn collect_indexed_missing_rank_is_error() {
1237        let region: Region = extent!(x = 2, y = 2).into(); // 4
1238        // Missing rank 3
1239        let pairs = vec![(0, 100), (1, 101), (2, 102)];
1240        let err = pairs
1241            .into_iter()
1242            .collect_indexed::<ValueMesh<_>>(region)
1243            .unwrap_err();
1244
1245        match err {
1246            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1247                assert_eq!(expected, 4);
1248                assert_eq!(actual, 3); // Distinct ranks seen.
1249            }
1250            other => panic!("unexpected error: {other:?}"),
1251        }
1252    }
1253
1254    #[test]
1255    fn collect_indexed_out_of_bounds_is_error() {
1256        let region: Region = extent!(x = 2, y = 2).into(); // 4 (valid ranks 0..=3)
1257        let pairs = vec![(0, 1), (4, 9)]; // 4 is out-of-bounds
1258        let err = pairs
1259            .into_iter()
1260            .collect_indexed::<ValueMesh<_>>(region)
1261            .unwrap_err();
1262
1263        match err {
1264            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1265                assert_eq!(expected, 4);
1266                assert_eq!(actual, 5); // offending index + 1
1267            }
1268            other => panic!("unexpected error: {other:?}"),
1269        }
1270    }
1271
1272    #[test]
1273    fn collect_indexed_duplicate_last_write_wins() {
1274        let region: Region = extent!(x = 1, y = 3).into(); // 3
1275        // rank 1 appears twice; last value should stick
1276        let pairs = vec![(0, 7), (1, 8), (1, 88), (2, 9)];
1277        let mesh = pairs
1278            .into_iter()
1279            .collect_indexed::<ValueMesh<_>>(region.clone())
1280            .unwrap();
1281
1282        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![7, 88, 9]);
1283    }
1284
1285    // Indexed collector naïve implementation (for reference).
1286    #[allow(clippy::result_large_err)]
1287    fn build_value_mesh_indexed<T>(
1288        region: Region,
1289        pairs: impl IntoIterator<Item = (usize, T)>,
1290    ) -> crate::v1::Result<ValueMesh<T>> {
1291        let n = region.num_ranks();
1292
1293        // Buffer for exactly n slots; fill by rank.
1294        let mut buf: Vec<Option<T>> = std::iter::repeat_with(|| None).take(n).collect();
1295        let mut filled = 0usize;
1296
1297        for (rank, value) in pairs {
1298            if rank >= n {
1299                // Out-of-bounds: report `expected` = n, `actual` =
1300                // offending index + 1; i.e. number of ranks implied
1301                // so far.
1302                return Err(crate::v1::Error::InvalidRankCardinality {
1303                    expected: n,
1304                    actual: rank + 1,
1305                });
1306            }
1307            if buf[rank].is_none() {
1308                filled += 1;
1309            }
1310            buf[rank] = Some(value); // Last write wins.
1311        }
1312
1313        if filled != n {
1314            // Missing ranks: actual = number of distinct ranks seen.
1315            return Err(crate::v1::Error::InvalidRankCardinality {
1316                expected: n,
1317                actual: filled,
1318            });
1319        }
1320
1321        // All present and in-bounds: unwrap and build unchecked.
1322        let ranks: Vec<T> = buf.into_iter().map(Option::unwrap).collect();
1323        Ok(ValueMesh::new_unchecked(region, ranks))
1324    }
1325
1326    /// This uses the bit-mixing portion of Sebastiano Vigna's
1327    /// [SplitMix64 algorithm](https://prng.di.unimi.it/splitmix64.c)
1328    /// to generate a high-quality 64-bit hash from a usize index.
1329    /// Unlike the full SplitMix64 generator, this is stateless - we
1330    /// accept an arbitrary x as input and apply the mix function to
1331    /// turn `x` deterministically into a "randomized" u64. input
1332    /// always produces the same output.
1333    fn hash_key(x: usize) -> u64 {
1334        let mut z = x as u64 ^ 0x9E3779B97F4A7C15;
1335        z = (z ^ (z >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
1336        z = (z ^ (z >> 27)).wrapping_mul(0x94D049BB133111EB);
1337        z ^ (z >> 31)
1338    }
1339
1340    /// Shuffle a slice deterministically, using a hash of indices as
1341    /// the key.
1342    ///
1343    /// Each position `i` is assigned a pseudo-random 64-bit key (from
1344    /// `key(i)`), the slice is sorted by those keys, and the
1345    /// resulting permutation is applied in place.
1346    ///
1347    /// The permutation is fully determined by the sequence of indices
1348    /// `0..n` and the chosen `key` function. Running it twice on the
1349    /// same input yields the same "random-looking" arrangement.
1350    ///
1351    /// This is going to be used (below) for property tests: it gives
1352    /// the effect of a shuffle without introducing global RNG state,
1353    /// and ensures that duplicate elements are still ordered
1354    /// consistently (so we can test "last write wins" semantics in
1355    /// collectors).
1356    fn pseudo_shuffle<'a, T: 'a>(v: &'a mut [T], key: impl Fn(usize) -> u64 + Copy) {
1357        // Build perm.
1358        let mut with_keys: Vec<(u64, usize)> = (0..v.len()).map(|i| (key(i), i)).collect();
1359        with_keys.sort_by_key(|&(k, _)| k);
1360        let perm: Vec<usize> = with_keys.into_iter().map(|(_, i)| i).collect();
1361
1362        // In-place permutation using a cycle based approach (e.g.
1363        // https://www.geeksforgeeks.org/dsa/permute-the-elements-of-an-array-following-given-order/).
1364        let mut seen = vec![false; v.len()];
1365        for i in 0..v.len() {
1366            if seen[i] {
1367                continue;
1368            }
1369            let mut a = i;
1370            while !seen[a] {
1371                seen[a] = true;
1372                let b = perm[a];
1373                // Short circuit on the cycle's start index.
1374                if b == i {
1375                    break;
1376                }
1377                v.swap(a, b);
1378                a = b;
1379            }
1380        }
1381    }
1382
1383    // Property: Optimized and reference collectors yield the same
1384    // `ValueMesh` on complete inputs, even with duplicates.
1385    //
1386    // - Begin with a complete set of `(rank, value)` pairs covering
1387    //   all ranks of the region.
1388    // - Add extra pairs at arbitrary ranks (up to `extra_len`), which
1389    //   necessarily duplicate existing entries when `extra_len > 0`.
1390    // - Shuffle the combined pairs deterministically.
1391    // - Collect using both the reference (`try_collect_indexed`) and
1392    //   optimized (`try_collect_indexed_opt`) implementations.
1393    //
1394    // Both collectors must succeed and produce identical results.
1395    // This demonstrates that the optimized version preserves
1396    // last-write-wins semantics and agrees exactly with the reference
1397    // behavior.
1398    proptest! {
1399        #[test]
1400        fn try_collect_opt_equivalence(region in gen_region(1..=4, 6), extra_len in 0usize..=12) {
1401            let n = region.num_ranks();
1402
1403            // Start with one pair per rank (coverage guaranteed).
1404            let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
1405
1406            // Add some extra duplicates of random in-bounds ranks.
1407            // Their values differ so last-write-wins is observable.
1408            let extras = proptest::collection::vec(0..n, extra_len)
1409                .new_tree(&mut proptest::test_runner::TestRunner::default())
1410                .unwrap()
1411                .current();
1412            for (k, r) in extras.into_iter().enumerate() {
1413                pairs.push((r, (n as i64) + (k as i64)));
1414            }
1415
1416            // Deterministic "shuffle" to fix iteration order across
1417            // both collectors.
1418            pseudo_shuffle(&mut pairs, hash_key);
1419
1420            // Reference vs optimized.
1421            let mesh_ref = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap();
1422            let mesh_opt = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region.clone()).unwrap();
1423
1424            prop_assert_eq!(mesh_ref.region(), mesh_opt.region());
1425            prop_assert_eq!(mesh_ref.values().collect::<Vec<_>>(), mesh_opt.values().collect::<Vec<_>>());
1426        }
1427    }
1428
1429    // Property: Optimized and reference collectors report identical
1430    // errors when ranks are missing.
1431    //
1432    // - Begin with a complete set of `(rank, value)` pairs.
1433    // - Remove one rank so coverage is incomplete.
1434    // - Shuffle deterministically.
1435    // - Collect with both implementations.
1436    //
1437    // Both must fail with `InvalidRankCardinality` describing the
1438    // same expected vs. actual counts.
1439    proptest! {
1440        #[test]
1441        fn try_collect_opt_missing_rank_errors_match(region in gen_region(1..=4, 6)) {
1442            let n = region.num_ranks();
1443            // Base complete.
1444            let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
1445            // Drop one distinct rank.
1446            if n > 0 {
1447                let drop_idx = 0usize; // Deterministic, fine for the property.
1448                pairs.remove(drop_idx);
1449            }
1450            // Shuffle deterministically.
1451            pseudo_shuffle(&mut pairs, hash_key);
1452
1453            let ref_err  = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
1454            let opt_err  = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
1455            assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
1456        }
1457    }
1458
1459    // Property: Optimized and reference collectors report identical
1460    // errors when given out-of-bounds ranks.
1461    //
1462    // - Construct a set of `(rank, value)` pairs.
1463    // - Include at least one pair whose rank is ≥
1464    //   `region.num_ranks()`.
1465    // - Shuffle deterministically.
1466    // - Collect with both implementations.
1467    //
1468    // Both must fail with `InvalidRankCardinality`, and the reported
1469    // error values must match exactly.
1470    proptest! {
1471        #[test]
1472        fn try_collect_opt_out_of_bound_errors_match(region in gen_region(1..=4, 6)) {
1473            let n = region.num_ranks();
1474            // One valid, then one out-of-bound.
1475            let mut pairs = vec![(0usize, 0i64), (n, 123i64)];
1476            pseudo_shuffle(&mut pairs, hash_key);
1477
1478            let ref_err = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
1479            let opt_err = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
1480            assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
1481        }
1482    }
1483
1484    #[test]
1485    fn map_into_preserves_region_and_order() {
1486        let region: Region = extent!(rows = 2, cols = 3).into();
1487        let vm = ValueMesh::new_unchecked(region.clone(), vec![0, 1, 2, 3, 4, 5]);
1488
1489        let doubled: ValueMesh<_> = vm.map_into(|x| x * 2);
1490        assert_eq!(doubled.region, region);
1491        assert_eq!(
1492            doubled.values().collect::<Vec<_>>(),
1493            vec![0, 2, 4, 6, 8, 10]
1494        );
1495    }
1496
1497    #[test]
1498    fn map_into_ref_borrows_and_preserves() {
1499        let region: Region = extent!(n = 4).into();
1500        let vm = ValueMesh::new_unchecked(
1501            region.clone(),
1502            vec!["a".to_string(), "b".into(), "c".into(), "d".into()],
1503        );
1504
1505        let lens: ValueMesh<_> = vm.map_into(|s| s.len());
1506        assert_eq!(lens.region, region);
1507        assert_eq!(lens.values().collect::<Vec<_>>(), vec![1, 1, 1, 1]);
1508    }
1509
1510    #[test]
1511    fn try_map_into_short_circuits_on_error() {
1512        let region = extent!(n = 4).into();
1513        let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
1514
1515        let res: Result<ValueMesh<i32>, &'static str> =
1516            vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
1517
1518        assert!(res.is_err());
1519        assert_eq!(res.unwrap_err(), "boom");
1520    }
1521
1522    #[test]
1523    fn try_map_into_ref_short_circuits_on_error() {
1524        let region = extent!(n = 4).into();
1525        let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
1526
1527        let res: Result<ValueMesh<i32>, &'static str> =
1528            vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
1529
1530        assert!(res.is_err());
1531        assert_eq!(res.unwrap_err(), "boom");
1532    }
1533
1534    // -- Helper to poll `core::future::Ready` without a runtime
1535    fn noop_waker() -> Waker {
1536        fn clone(_: *const ()) -> RawWaker {
1537            RawWaker::new(std::ptr::null(), &VTABLE)
1538        }
1539        fn wake(_: *const ()) {}
1540        fn wake_by_ref(_: *const ()) {}
1541        fn drop(_: *const ()) {}
1542        static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
1543        // SAFETY: The raw waker never dereferences its data pointer
1544        // (`null`), and all vtable fns are no-ops. It's only used to
1545        // satisfy `Context` for polling already-ready futures in
1546        // tests.
1547        unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
1548    }
1549
1550    fn poll_now<F: Future>(mut fut: F) -> F::Output {
1551        let waker = noop_waker();
1552        let mut cx = Context::from_waker(&waker);
1553        // SAFETY: `fut` is a local stack variable that we never move
1554        // after pinning, and we only use it to poll immediately
1555        // within this scope. This satisfies the invariants of
1556        // `Pin::new_unchecked`.
1557        let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
1558        match fut.as_mut().poll(&mut cx) {
1559            Poll::Ready(v) => v,
1560            Poll::Pending => unreachable!("Ready futures must complete immediately"),
1561        }
1562    }
1563    // --
1564
1565    #[test]
1566    fn map_into_ready_futures() {
1567        let region: Region = extent!(r = 2, c = 2).into();
1568        let vm = ValueMesh::new_unchecked(region.clone(), vec![10, 20, 30, 40]);
1569
1570        // Map to `core::future::Ready` futures.
1571        let pending: ValueMesh<core::future::Ready<_>> =
1572            vm.map_into(|x| core::future::ready(x + 1));
1573        assert_eq!(pending.region, region);
1574
1575        // Drive the ready futures without a runtime and collect results.
1576        let results: Vec<_> = pending.values().map(|f| poll_now(f.clone())).collect();
1577        assert_eq!(results, vec![11, 21, 31, 41]);
1578    }
1579
1580    #[test]
1581    fn map_into_single_element_mesh() {
1582        let region: Region = extent!(n = 1).into();
1583        let vm = ValueMesh::new_unchecked(region.clone(), vec![7]);
1584
1585        let out: ValueMesh<_> = vm.map_into(|x| x * x);
1586        assert_eq!(out.region, region);
1587        assert_eq!(out.values().collect::<Vec<_>>(), vec![49]);
1588    }
1589
1590    #[test]
1591    fn map_into_ref_with_non_clone_field() {
1592        // A type that intentionally does NOT implement Clone.
1593        #[derive(Debug, PartialEq, Eq)]
1594        struct NotClone(i32);
1595
1596        let region: Region = extent!(x = 3).into();
1597        let values = vec![(10, NotClone(1)), (20, NotClone(2)), (30, NotClone(3))];
1598        let mesh: ValueMesh<(i32, NotClone)> =
1599            values.into_iter().collect_mesh(region.clone()).unwrap();
1600
1601        let projected: ValueMesh<i32> = mesh.map_into(|t| t.0);
1602        assert_eq!(projected.values().collect::<Vec<_>>(), vec![10, 20, 30]);
1603        assert_eq!(projected.region(), &region);
1604    }
1605
1606    #[test]
1607    fn rle_roundtrip_all_equal() {
1608        let region: Region = extent!(n = 6).into();
1609        let mut vm = ValueMesh::new_unchecked(region.clone(), vec![42; 6]);
1610
1611        // Compress and ensure logical equality preserved.
1612        vm.compress_adjacent_in_place();
1613        let collected: Vec<_> = vm.values().collect();
1614        assert_eq!(collected, vec![42, 42, 42, 42, 42, 42]);
1615
1616        // Random access still works.
1617        for i in 0..region.num_ranks() {
1618            assert_eq!(vm.get(i), Some(&42));
1619        }
1620        assert_eq!(vm.get(region.num_ranks()), None); // out-of-bounds
1621    }
1622
1623    #[test]
1624    fn rle_roundtrip_alternating() {
1625        let region: Region = extent!(n = 6).into();
1626        let original = vec![1, 2, 1, 2, 1, 2];
1627        let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1628
1629        vm.compress_adjacent_in_place();
1630        let collected: Vec<_> = vm.values().collect();
1631        assert_eq!(collected, original);
1632
1633        // Spot-check random access after compression.
1634        assert_eq!(vm.get(0), Some(&1));
1635        assert_eq!(vm.get(1), Some(&2));
1636        assert_eq!(vm.get(3), Some(&2));
1637        assert_eq!(vm.get(5), Some(&2));
1638    }
1639
1640    #[test]
1641    fn rle_roundtrip_blocky_and_slice() {
1642        // Blocks: 0,0,0 | 1,1 | 2,2,2,2 | 3
1643        let region: Region = extent!(n = 10).into();
1644        let original = vec![0, 0, 0, 1, 1, 2, 2, 2, 2, 3];
1645        let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1646
1647        vm.compress_adjacent_in_place();
1648        let collected: Vec<_> = vm.values().collect();
1649        assert_eq!(collected, original);
1650
1651        // Slice a middle subregion [3..8) → [1,1,2,2,2]
1652        let sub_region = region.range("n", 3..8).unwrap();
1653        let sliced = vm.sliced(sub_region);
1654        let sliced_vec: Vec<_> = sliced.values().collect();
1655        assert_eq!(sliced_vec, vec![1, 1, 2, 2, 2]);
1656    }
1657
1658    #[test]
1659    fn rle_idempotent_noop_on_second_call() {
1660        let region: Region = extent!(n = 7).into();
1661        let original = vec![9, 9, 9, 8, 8, 9, 9];
1662        let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1663
1664        vm.compress_adjacent_in_place();
1665        let once: Vec<_> = vm.values().collect();
1666        assert_eq!(once, original);
1667
1668        // Calling again should be a no-op and still yield identical
1669        // values.
1670        vm.compress_adjacent_in_place();
1671        let twice: Vec<_> = vm.values().collect();
1672        assert_eq!(twice, original);
1673    }
1674
1675    #[test]
1676    fn rle_works_after_build_indexed() {
1677        // Build with shuffled pairs, then compress and verify
1678        // semantics.
1679        let region: Region = extent!(x = 2, y = 3).into(); // 6
1680        let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)];
1681        let mut vm = pairs
1682            .into_iter()
1683            .collect_indexed::<ValueMesh<_>>(region.clone())
1684            .unwrap();
1685
1686        // Should compress to 6 runs of length 1; still must
1687        // round-trip.
1688        vm.compress_adjacent_in_place();
1689        let collected: Vec<_> = vm.values().collect();
1690        assert_eq!(collected, vec![0, 10, 20, 30, 40, 50]);
1691        // Spot-check get()
1692        assert_eq!(vm.get(4), Some(&40));
1693    }
1694
1695    #[test]
1696    fn rle_handles_singleton_mesh() {
1697        let region: Region = extent!(n = 1).into();
1698        let mut vm = ValueMesh::new_unchecked(region.clone(), vec![123]);
1699
1700        vm.compress_adjacent_in_place();
1701        let collected: Vec<_> = vm.values().collect();
1702        assert_eq!(collected, vec![123]);
1703        assert_eq!(vm.get(0), Some(&123));
1704        assert_eq!(vm.get(1), None);
1705    }
1706
1707    #[test]
1708    fn test_dense_round_trip() {
1709        // Build a simple dense mesh of 5 integers.
1710        let region: Region = extent!(x = 5).into();
1711        let dense = ValueMesh::new(region.clone(), vec![1, 2, 3, 4, 5]).unwrap();
1712
1713        let json = serde_json::to_string_pretty(&dense).unwrap();
1714        let restored: ValueMesh<i32> = serde_json::from_str(&json).unwrap();
1715
1716        assert_eq!(dense, restored);
1717
1718        // Dense meshes should stay dense on the wire: check the
1719        // tagged variant.
1720        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1721        // enum tag is nested: {"rep": {"rep":"dense", ...}}
1722        let tag = v
1723            .get("rep")
1724            .and_then(|o| o.get("rep"))
1725            .and_then(|s| s.as_str());
1726        assert_eq!(tag, Some("dense"));
1727    }
1728
1729    #[test]
1730    fn test_compressed_round_trip() {
1731        // Build a dense mesh, compress it, and verify it stays
1732        // compressed on the wire.
1733        let region: Region = extent!(x = 10).into();
1734        let mut mesh = ValueMesh::new(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 3, 3, 3]).unwrap();
1735        mesh.compress_adjacent_in_place();
1736
1737        let json = serde_json::to_string_pretty(&mesh).unwrap();
1738        let restored: ValueMesh<i32> = serde_json::from_str(&json).unwrap();
1739
1740        // Logical equality preserved.
1741        assert_eq!(mesh, restored);
1742
1743        // Compressed meshes should stay compressed on the wire.
1744        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1745        // enum tag is nested: {"rep": {"rep":"compressed", ...}}
1746        let tag = v
1747            .get("rep")
1748            .and_then(|o| o.get("rep"))
1749            .and_then(|s| s.as_str());
1750        assert_eq!(tag, Some("compressed"));
1751    }
1752
1753    #[test]
1754    fn test_stable_run_encoding() {
1755        let run = Run::new(0, 10, 42);
1756        let json = serde_json::to_string(&run).unwrap();
1757        let decoded: Run = serde_json::from_str(&json).unwrap();
1758
1759        assert_eq!(run, decoded);
1760        assert_eq!(run.start, 0);
1761        assert_eq!(run.end, 10);
1762        assert_eq!(run.id, 42);
1763
1764        // Ensure conversion back to Range<usize> works.
1765        let (range, id): (Range<usize>, u32) = run.try_into().unwrap();
1766        assert_eq!(range, 0..10);
1767        assert_eq!(id, 42);
1768    }
1769
1770    #[test]
1771    fn from_single_builds_single_run() {
1772        let region: Region = extent!(n = 6).into();
1773        let vm = ValueMesh::from_single(region.clone(), 7);
1774
1775        assert_eq!(vm.region(), &region);
1776        assert_eq!(vm.values().collect::<Vec<_>>(), vec![7, 7, 7, 7, 7, 7]);
1777        assert_eq!(vm.get(0), Some(&7));
1778        assert_eq!(vm.get(5), Some(&7));
1779        assert_eq!(vm.get(6), None);
1780    }
1781
1782    #[test]
1783    fn from_default_builds_with_default_value() {
1784        let region: Region = extent!(n = 6).into();
1785        let vm = ValueMesh::<i32>::from_default(region.clone());
1786
1787        assert_eq!(vm.region(), &region);
1788        // i32::default() == 0
1789        assert_eq!(vm.values().collect::<Vec<_>>(), vec![0, 0, 0, 0, 0, 0]);
1790        assert_eq!(vm.get(0), Some(&0));
1791        assert_eq!(vm.get(5), Some(&0));
1792    }
1793
1794    #[test]
1795    fn test_default_vs_single_equivalence() {
1796        let region: Region = extent!(x = 4).into();
1797        let d1 = ValueMesh::<i32>::from_default(region.clone());
1798        let d2 = ValueMesh::from_single(region.clone(), 0);
1799        assert_eq!(d1, d2);
1800    }
1801
1802    #[test]
1803    fn build_from_ranges_with_default_basic() {
1804        let region: Region = extent!(n = 10).into();
1805        let vm = ValueMesh::from_ranges_with_default(
1806            region.clone(),
1807            0, // default
1808            vec![(2..4, 1), (6..9, 2)],
1809        )
1810        .unwrap();
1811
1812        assert_eq!(vm.region(), &region);
1813        assert_eq!(
1814            vm.values().collect::<Vec<_>>(),
1815            vec![0, 0, 1, 1, 0, 0, 2, 2, 2, 0]
1816        );
1817
1818        // Internal shape: [0..2)->0, [2..4)->1, [4..6)->0, [6..9)->2,
1819        // [9..10)->0
1820        if let Rep::Compressed { table, runs } = &vm.rep {
1821            // Table is small and de-duplicated.
1822            assert!(table.len() <= 3);
1823            assert_eq!(runs.len(), 5);
1824        } else {
1825            panic!("expected compressed");
1826        }
1827    }
1828
1829    #[test]
1830    fn build_from_ranges_with_default_edge_cases() {
1831        let region: Region = extent!(n = 5).into();
1832
1833        // Full override covers entire region.
1834        let vm = ValueMesh::from_ranges_with_default(region.clone(), 9, vec![(0..5, 3)]).unwrap();
1835        assert_eq!(vm.values().collect::<Vec<_>>(), vec![3, 3, 3, 3, 3]);
1836
1837        // Adjacent overrides and default gaps.
1838        let vm = ValueMesh::from_ranges_with_default(region.clone(), 0, vec![(1..2, 7), (2..4, 7)])
1839            .unwrap();
1840        assert_eq!(vm.values().collect::<Vec<_>>(), vec![0, 7, 7, 7, 0]);
1841
1842        // Empty region.
1843        let empty_region: Region = extent!(n = 0).into();
1844        let vm = ValueMesh::from_ranges_with_default(empty_region.clone(), 42, vec![]).unwrap();
1845        assert_eq!(vm.values().collect::<Vec<_>>(), Vec::<i32>::new());
1846    }
1847
1848    #[test]
1849    fn from_dense_builds_and_compresses() {
1850        let region: Region = extent!(n = 6).into();
1851        let mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 2, 2, 3, 3]).unwrap();
1852
1853        assert_eq!(mesh.region(), &region);
1854        assert!(matches!(mesh.rep, Rep::Compressed { .. }));
1855        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![1, 1, 2, 2, 3, 3]);
1856
1857        // Spot-check indexing.
1858        assert_eq!(mesh.get(0), Some(&1));
1859        assert_eq!(mesh.get(3), Some(&2));
1860        assert_eq!(mesh.get(5), Some(&3));
1861    }
1862
1863    #[test]
1864    fn merge_from_overlay_basic() {
1865        // Base mesh with two contiguous runs.
1866        let region: Region = extent!(n = 8).into();
1867        let mut mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3]).unwrap();
1868
1869        // Overlay replaces middle segment [2..6) with 9s.
1870        let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9)]).unwrap();
1871
1872        mesh.merge_from_overlay(overlay).unwrap();
1873
1874        // Materialize back into ranges to inspect.
1875        let out = mesh.materialized_runs();
1876
1877        // Expected: left prefix (0..2)=1, replaced middle (2..6)=9, tail (6..8)=3.
1878        assert_eq!(out, vec![(0..2, 1), (2..6, 9), (6..8, 3)]);
1879    }
1880
1881    #[test]
1882    fn merge_from_overlay_multiple_spans() {
1883        // Build mesh with alternating runs.
1884        let region: Region = extent!(m = 12).into();
1885        let mut mesh =
1886            ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4])
1887                .unwrap();
1888
1889        // Overlay has a run that spans across the boundary of two
1890        // left runs and another disjoint run later.
1891        let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9), (9..11, 8)]).unwrap();
1892
1893        mesh.merge_from_overlay(overlay).unwrap();
1894        let out = mesh.materialized_runs();
1895
1896        // Expected after merge and re-compression:
1897        // (0..2,1) untouched
1898        // (2..6,9) overwrite of part of [1,2] runs
1899        // (6..9,3) left tail survives
1900        // (9..11,8) overwrite inside [4] run
1901        // (11..12,4) leftover tail
1902        assert_eq!(
1903            out,
1904            vec![(0..2, 1), (2..6, 9), (6..9, 3), (9..11, 8), (11..12, 4)]
1905        );
1906    }
1907
1908    #[test]
1909    fn merge_from_overlay_crosses_row_boundary() {
1910        // 2 x 5 region -> 10 linear ranks in row-major order.
1911        let region: Region = extent!(rows = 2, cols = 5).into();
1912
1913        // Dense values laid out row-major:
1914        // row 0: [1, 1, 1, 2, 2]
1915        // row 1: [3, 3, 4, 4, 4]
1916        let mut mesh =
1917            ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 4, 4, 4]).unwrap();
1918
1919        // Overlay that crosses the row boundary:
1920        // linear ranks [3..7) -> 9
1921        //   - tail of row 0: indices 3,4 (the two 2s)
1922        //   - head of row 1: indices 5,6 (the two 3s)
1923        let overlay = ValueOverlay::try_from_runs(vec![(3..7, 9)]).unwrap();
1924
1925        mesh.merge_from_overlay(overlay).unwrap();
1926
1927        // After merge, the dense view should be:
1928        // [1,1,1, 9,9, 9,9, 4,4,4]
1929        let flat: Vec<_> = mesh.values().collect();
1930        assert_eq!(flat, vec![1, 1, 1, 9, 9, 9, 9, 4, 4, 4]);
1931
1932        // And the materialized runs should reflect that:
1933        // (0..3,1) | (3..7,9) | (7..10,4)
1934        let runs = mesh.materialized_runs();
1935        assert_eq!(runs, vec![(0..3, 1), (3..7, 9), (7..10, 4)]);
1936    }
1937}