hyperactor_mesh/v1/
value_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::cmp::Ordering;
10use std::collections::HashMap;
11use std::collections::hash_map::Entry;
12use std::hash::Hash;
13use std::marker::PhantomData;
14use std::mem;
15use std::mem::MaybeUninit;
16use std::ops::Range;
17use std::ptr;
18use std::ptr::NonNull;
19
20use futures::Future;
21use hyperactor::accum::Accumulator;
22use hyperactor::accum::CommReducer;
23use hyperactor::accum::ReducerFactory;
24use hyperactor::accum::ReducerSpec;
25use ndslice::extent;
26use ndslice::view;
27use ndslice::view::Ranked;
28use ndslice::view::Region;
29use serde::Deserialize;
30use serde::Serialize;
31use typeuri::Named;
32
33mod rle;
34mod value_overlay;
35pub use value_overlay::BuildError;
36pub use value_overlay::ValueOverlay;
37
38/// A mesh of values, one per rank in `region`.
39///
40/// The internal representation (`rep`) may be dense or compressed,
41/// but externally the mesh always behaves as a complete mapping from
42/// rank index → value.
43///
44/// # Invariants
45/// - Complete: every rank in `region` has exactly one value.
46/// - Order: iteration and indexing follow the region's linearization.
47#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements
48pub struct ValueMesh<T> {
49    /// The logical multidimensional domain of the mesh.
50    ///
51    /// Determines the number of ranks (`region.num_ranks()`) and the
52    /// order in which they are traversed.
53    region: Region,
54
55    /// The underlying storage representation.
56    ///
57    /// - `Rep::Dense` stores a `Vec<T>` with one value per rank.
58    /// - `Rep::Compressed` stores a run-length encoded table of
59    ///   unique values plus `(Range<usize>, u32)` pairs describing
60    ///   contiguous runs of identical values.
61    ///
62    /// The representation is an internal optimization detail; all
63    /// public APIs (e.g. `values()`, `get()`, slicing) behave as if
64    /// the mesh were dense.
65    rep: Rep<T>,
66}
67
68/// A single run-length–encoded (RLE) segment within a [`ValueMesh`].
69///
70/// Each `Run` represents a contiguous range of ranks `[start, end)`
71/// that all share the same value, referenced indirectly via a table
72/// index `id`. This allows compact storage of large regions with
73/// repeated values.
74///
75/// Runs are serialized in a stable, portable format using `u64` for
76/// range bounds (`start`, `end`) to avoid platform‐dependent `usize`
77/// encoding differences.
78#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
79struct Run {
80    /// Inclusive start of the contiguous range of ranks (0-based).
81    start: u64,
82    /// Exclusive end of the contiguous range of ranks (0-based).
83    end: u64,
84    /// Index into the value table for this run's shared value.
85    id: u32,
86}
87
88impl Run {
89    /// Creates a new `Run` covering ranks `[start, end)` that all
90    /// share the same table entry `id`.
91    ///
92    /// Converts `usize` bounds to `u64` for stable serialization.
93    fn new(start: usize, end: usize, id: u32) -> Self {
94        Self {
95            start: start as u64,
96            end: end as u64,
97            id,
98        }
99    }
100}
101
102impl TryFrom<Run> for (Range<usize>, u32) {
103    type Error = &'static str;
104
105    /// Converts a serialized [`Run`] back into its in-memory form
106    /// `(Range<usize>, u32)`.
107    ///
108    /// Performs checked conversion of the 64-bit wire fields back
109    /// into `usize` indices, returning an error if either bound
110    /// exceeds the platform's addressable range. This ensures safe
111    /// round-tripping between the serialized wire format and native
112    /// representation.
113    #[allow(clippy::result_large_err)]
114    fn try_from(r: Run) -> Result<Self, Self::Error> {
115        let start = usize::try_from(r.start).map_err(|_| "run.start too large")?;
116        let end = usize::try_from(r.end).map_err(|_| "run.end too large")?;
117        Ok((start..end, r.id))
118    }
119}
120
121/// Internal storage representation for a [`ValueMesh`].
122///
123/// This enum abstracts how the per-rank values are stored.
124/// Externally, both variants behave identically — the difference is
125/// purely in memory layout and access strategy.
126///
127/// - [`Rep::Dense`] stores one value per rank, directly.
128/// - [`Rep::Compressed`] stores a compact run-length-encoded form,
129///   reusing identical values across contiguous ranks.
130///
131/// Users of [`ValueMesh`] normally never interact with `Rep`
132/// directly; all iteration and slicing APIs present a dense logical
133/// view.
134#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements
135enum Rep<T> {
136    /// Fully expanded representation: one element per rank.
137    ///
138    /// The length of `values` is always equal to
139    /// `region.num_ranks()`. This form is simple and fast for
140    /// iteration and mutation but uses more memory when large runs of
141    /// repeated values are present.
142    Dense {
143        /// Flat list of values, one per rank in the region's
144        /// linearization order.
145        values: Vec<T>,
146    },
147
148    /// Run-length-encoded representation.
149    ///
150    /// Each run `(Range<usize>, id)` indicates that the ranks within
151    /// `Range` (half-open `[start, end)`) share the same value at
152    /// `table[id]`. The `table` stores each distinct value once.
153    ///
154    /// # Invariants
155    /// - Runs are non-empty and contiguous (`r.start < r.end`).
156    /// - Runs collectively cover `0..region.num_ranks()` with no gaps
157    ///   or overlaps.
158    /// - `id` indexes into `table` (`id < table.len()`).
159    Compressed {
160        /// The deduplicated set of unique values referenced by runs.
161        table: Vec<T>,
162
163        /// List of `(range, table_id)` pairs describing contiguous
164        /// runs of identical values in region order.
165        runs: Vec<Run>,
166    },
167}
168
169// At this time, Default is used primarily to satisfy the mailbox
170// Accumulator bound. It constructs an empty (zero-rank) mesh. Other
171// contexts may also use this as a generic "empty mesh" initializer
172// when a concrete region is not yet known.
173impl<T> Default for ValueMesh<T> {
174    fn default() -> Self {
175        Self::empty()
176    }
177}
178
179impl<T> ValueMesh<T> {
180    /// Returns an *empty* mesh: a 1-dimensional region of length 0.
181    ///
182    /// This differs from a *dimensionless* (0-D) region, which
183    /// represents a single scalar element. A zero-length 1-D region
184    /// has **no elements at all**, making it the natural `Default`
185    /// placeholder for accumulator state initialization.
186    pub fn empty() -> Self {
187        // zero-rank region; no constraints on T
188        let region = extent!(r = 0).into();
189        Self::new_unchecked(region, Vec::<T>::new())
190    }
191
192    /// Creates a new `ValueMesh` for `region` with exactly one value
193    /// per rank.
194    ///
195    /// # Invariants
196    /// This constructor validates that the number of provided values
197    /// (`ranks.len()`) matches the region's cardinality
198    /// (`region.num_ranks()`). A value mesh must be complete: every
199    /// rank in `region` has a corresponding `T`.
200    ///
201    /// # Errors
202    /// Returns [`Error::InvalidRankCardinality`] if `ranks.len() !=
203    /// region.num_ranks()`.
204    /// ```
205    #[allow(clippy::result_large_err)]
206    pub(crate) fn new(region: Region, ranks: Vec<T>) -> crate::v1::Result<Self> {
207        let (actual, expected) = (ranks.len(), region.num_ranks());
208        if actual != expected {
209            return Err(crate::v1::Error::InvalidRankCardinality { expected, actual });
210        }
211        Ok(Self {
212            region,
213            rep: Rep::Dense { values: ranks },
214        })
215    }
216
217    /// Constructs a `ValueMesh` without checking cardinality. Caller
218    /// must ensure `ranks.len() == region.num_ranks()`.
219    #[inline]
220    pub(crate) fn new_unchecked(region: Region, ranks: Vec<T>) -> Self {
221        debug_assert_eq!(region.num_ranks(), ranks.len());
222        Self {
223            region,
224            rep: Rep::Dense { values: ranks },
225        }
226    }
227}
228
229impl<T: Clone> ValueMesh<T> {
230    /// Builds a `ValueMesh` that assigns the single value `s` to
231    /// every rank in `region`, without materializing a dense
232    /// `Vec<T>`. The result is stored in compressed (RLE) form as a
233    /// single run `[0..N)`.
234    ///
235    /// If `region.num_ranks() == 0`, the mesh contains no runs (and
236    /// an empty table), regardless of `s`.
237    pub fn from_single(region: Region, s: T) -> Self {
238        let n = region.num_ranks();
239        if n == 0 {
240            return Self {
241                region,
242                rep: Rep::Compressed {
243                    table: Vec::new(),
244                    runs: Vec::new(),
245                },
246            };
247        }
248
249        let table = vec![s];
250        let runs = vec![Run::new(0, n, 0)];
251        Self {
252            region,
253            rep: Rep::Compressed { table, runs },
254        }
255    }
256}
257
258impl<T: Clone + Default> ValueMesh<T> {
259    /// Builds a [`ValueMesh`] covering the region, filled with
260    /// `T::default()`.
261    ///
262    /// Equivalent to [`ValueMesh::from_single(region,
263    /// T::default())`].
264    pub fn from_default(region: Region) -> Self {
265        ValueMesh::<T>::from_single(region, T::default())
266    }
267}
268
269impl<T: Eq + Hash> ValueMesh<T> {
270    /// Builds a compressed mesh from a default value and a set of
271    /// disjoint ranges that override the default.
272    ///
273    /// - `ranges` may be in any order; they must be non-empty,
274    ///   in-bounds, and non-overlapping.
275    /// - Unspecified ranks are filled with `default`.
276    /// - Result is stored in RLE form; no dense `Vec<T>` is
277    ///   materialized.
278    #[allow(clippy::result_large_err)]
279    pub fn from_ranges_with_default(
280        region: Region,
281        default: T,
282        mut ranges: Vec<(Range<usize>, T)>,
283    ) -> crate::v1::Result<Self> {
284        let n = region.num_ranks();
285
286        if n == 0 {
287            return Ok(Self {
288                region,
289                rep: Rep::Compressed {
290                    table: Vec::new(),
291                    runs: Vec::new(),
292                },
293            });
294        }
295
296        // Validate: non-empty, in-bounds; then sort.
297        for (r, _) in &ranges {
298            if r.is_empty() {
299                return Err(crate::v1::Error::InvalidRankCardinality {
300                    expected: n,
301                    actual: 0,
302                }); // TODO: this surfaces the error but its not a great fit
303            }
304            if r.end > n {
305                return Err(crate::v1::Error::InvalidRankCardinality {
306                    expected: n,
307                    actual: r.end,
308                });
309            }
310        }
311        ranges.sort_by_key(|(r, _)| (r.start, r.end));
312
313        // Validate: non-overlapping.
314        for w in ranges.windows(2) {
315            let (a, _) = &w[0];
316            let (b, _) = &w[1];
317            if a.end > b.start {
318                // Overlap
319                return Err(crate::v1::Error::InvalidRankCardinality {
320                    expected: n,
321                    actual: b.start, // TODO: this surfaces the error but is a bad fit
322                });
323            }
324        }
325
326        // Internal index: value -> table id (assigned once).
327        let mut index: HashMap<T, u32> = HashMap::with_capacity(1 + ranges.len());
328        let mut next_id: u32 = 0;
329
330        // Helper: assign or reuse an id by value, taking ownership of
331        // v.
332        let id_of = |v: T, index: &mut HashMap<T, u32>, next_id: &mut u32| -> u32 {
333            match index.entry(v) {
334                Entry::Occupied(o) => *o.get(),
335                Entry::Vacant(vac) => {
336                    let id = *next_id;
337                    *next_id += 1;
338                    vac.insert(id);
339                    id
340                }
341            }
342        };
343
344        let default_id = id_of(default, &mut index, &mut next_id);
345
346        let mut runs: Vec<Run> = Vec::with_capacity(1 + 2 * ranges.len());
347        let mut cursor = 0usize;
348
349        for (r, v) in ranges.into_iter() {
350            // Fill default gap if any.
351            if cursor < r.start {
352                runs.push(Run::new(cursor, r.start, default_id));
353            }
354            // Override block.
355            let id = id_of(v, &mut index, &mut next_id);
356            runs.push(Run::new(r.start, r.end, id));
357            cursor = r.end;
358        }
359
360        // Trailing default tail.
361        if cursor < n {
362            runs.push(Run::new(cursor, n, default_id));
363        }
364
365        // Materialize table in id order without cloning: move keys
366        // out of the map.
367        let mut table_slots: Vec<Option<T>> = Vec::new();
368        table_slots.resize_with(next_id as usize, || None);
369
370        for (t, id) in index.into_iter() {
371            table_slots[id as usize] = Some(t);
372        }
373
374        let table: Vec<T> = table_slots
375            .into_iter()
376            .map(|o| o.expect("every id must be assigned"))
377            .collect();
378
379        Ok(Self {
380            region,
381            rep: Rep::Compressed { table, runs },
382        })
383    }
384
385    /// Builds a [`ValueMesh`] from a fully materialized dense vector
386    /// of per-rank values, then compresses it into run-length–encoded
387    /// form if possible.
388    ///
389    /// This constructor is intended for callers that already have one
390    /// value per rank (e.g. computed or received data) but wish to
391    /// store it efficiently.
392    ///
393    /// # Parameters
394    /// - `region`: The logical region describing the mesh's shape and
395    ///   rank order.
396    /// - `values`: A dense vector of values, one per rank in
397    ///   `region`.
398    ///
399    /// # Returns
400    /// A [`ValueMesh`] whose internal representation is `Compressed`
401    /// if any adjacent elements are equal, or `Dense` if no
402    /// compression was possible.
403    ///
404    /// # Errors
405    /// Returns an error if the number of provided `values` does not
406    /// match the number of ranks in `region`.
407    ///
408    /// # Examples
409    /// ```ignore
410    /// let region: Region = extent!(n = 5).into();
411    /// let mesh = ValueMesh::from_dense(region, vec![1, 1, 2, 2, 3]).unwrap();
412    /// // Internally compressed to three runs: [1, 1], [2, 2], [3]
413    /// ```
414    #[allow(clippy::result_large_err)]
415    pub fn from_dense(region: Region, values: Vec<T>) -> crate::v1::Result<Self> {
416        let mut vm = Self::new(region, values)?;
417        vm.compress_adjacent_in_place();
418        Ok(vm)
419    }
420}
421
422impl<F: Future> ValueMesh<F> {
423    /// Await all futures in the mesh, yielding a `ValueMesh` of their
424    /// outputs.
425    pub async fn join(self) -> ValueMesh<F::Output> {
426        let ValueMesh { region, rep } = self;
427
428        match rep {
429            Rep::Dense { values } => {
430                let results = futures::future::join_all(values).await;
431                ValueMesh::new_unchecked(region, results)
432            }
433            Rep::Compressed { .. } => {
434                unreachable!("join() not implemented for compressed meshes")
435            }
436        }
437    }
438}
439
440impl<T, E> ValueMesh<Result<T, E>> {
441    /// Transposes a `ValueMesh<Result<T, E>>` into a
442    /// `Result<ValueMesh<T>, E>`.
443    pub fn transpose(self) -> Result<ValueMesh<T>, E> {
444        let ValueMesh { region, rep } = self;
445
446        match rep {
447            Rep::Dense { values } => {
448                let values = values.into_iter().collect::<Result<Vec<T>, E>>()?;
449                Ok(ValueMesh::new_unchecked(region, values))
450            }
451            Rep::Compressed { table, runs } => {
452                let table: Vec<T> = table.into_iter().collect::<Result<Vec<T>, E>>()?;
453                Ok(ValueMesh {
454                    region,
455                    rep: Rep::Compressed { table, runs },
456                })
457            }
458        }
459    }
460}
461
462impl<T: 'static> view::Ranked for ValueMesh<T> {
463    type Item = T;
464
465    /// Returns the region that defines this mesh's shape and rank
466    /// order.
467    fn region(&self) -> &Region {
468        &self.region
469    }
470
471    /// Looks up the value at the given linearized rank.
472    ///
473    /// Works transparently for both dense and compressed
474    /// representations:
475    /// - In the dense case, it simply indexes into the `values`
476    ///   vector.
477    /// - In the compressed case, it performs a binary search over run
478    ///   boundaries to find which run contains the given rank, then
479    ///   returns the corresponding entry from `table`.
480    ///
481    /// Returns `None` if the rank is out of bounds.
482    fn get(&self, rank: usize) -> Option<&Self::Item> {
483        if rank >= self.region.num_ranks() {
484            return None;
485        }
486
487        match &self.rep {
488            Rep::Dense { values } => values.get(rank),
489
490            Rep::Compressed { table, runs } => {
491                let rank = rank as u64;
492
493                // Binary search over runs: find the one whose range
494                // contains `rank`.
495                let idx = runs
496                    .binary_search_by(|run| {
497                        if run.end <= rank {
498                            Ordering::Less
499                        } else if run.start > rank {
500                            Ordering::Greater
501                        } else {
502                            Ordering::Equal
503                        }
504                    })
505                    .ok()?;
506
507                // Map the run's table ID to its actual value.
508                let id = runs[idx].id as usize;
509                table.get(id)
510            }
511        }
512    }
513}
514
515impl<T: Clone + 'static> view::RankedSliceable for ValueMesh<T> {
516    fn sliced(&self, region: Region) -> Self {
517        debug_assert!(region.is_subset(self.region()), "sliced: not a subset");
518        let ranks: Vec<T> = self
519            .region()
520            .remap(&region)
521            .unwrap()
522            .map(|index| self.get(index).unwrap().clone())
523            .collect();
524        debug_assert_eq!(
525            region.num_ranks(),
526            ranks.len(),
527            "sliced: cardinality mismatch"
528        );
529        Self::new_unchecked(region, ranks)
530    }
531}
532
533impl<T> view::BuildFromRegion<T> for ValueMesh<T> {
534    type Error = crate::v1::Error;
535
536    fn build_dense(region: Region, values: Vec<T>) -> Result<Self, Self::Error> {
537        Self::new(region, values)
538    }
539
540    fn build_dense_unchecked(region: Region, values: Vec<T>) -> Self {
541        Self::new_unchecked(region, values)
542    }
543}
544
545impl<T> view::BuildFromRegionIndexed<T> for ValueMesh<T> {
546    type Error = crate::v1::Error;
547
548    fn build_indexed(
549        region: Region,
550        pairs: impl IntoIterator<Item = (usize, T)>,
551    ) -> Result<Self, Self::Error> {
552        let n = region.num_ranks();
553
554        // Allocate uninitialized buffer for T.
555        // Note: Vec<MaybeUninit<T>>'s Drop will only free the
556        // allocation; it never runs T's destructor. We must
557        // explicitly drop any initialized elements (DropGuard) or
558        // convert into Vec<T>.
559        let mut buf: Vec<MaybeUninit<T>> = Vec::with_capacity(n);
560        // SAFETY: set `len = n` to treat the buffer as n uninit slots
561        // of `MaybeUninit<T>`. We never read before `ptr::write`,
562        // drop only slots marked initialized (bitset), and convert to
563        // `Vec<T>` only once all 0..n are initialized (guard enforces
564        // this).
565        unsafe {
566            buf.set_len(n);
567        }
568
569        // Compact bitset for occupancy.
570        let words = n.div_ceil(64);
571        let mut bits = vec![0u64; words];
572        let mut filled = 0usize;
573
574        // Drop guard: cleans up initialized elements on early exit.
575        // Stores raw, non-borrowed pointers (`NonNull`), so we don't
576        // hold Rust references for the whole scope. This allows
577        // mutating `buf`/`bits` inside the loop while still letting
578        // the guard access them if dropped early.
579        struct DropGuard<T> {
580            buf: NonNull<MaybeUninit<T>>,
581            bits: NonNull<u64>,
582            n_elems: usize,
583            n_words: usize,
584            disarm: bool,
585        }
586
587        impl<T> DropGuard<T> {
588            /// # Safety
589            /// - `buf` points to `buf.len()` contiguous
590            ///   `MaybeUninit<T>` and outlives the guard.
591            /// - `bits` points to `bits.len()` contiguous `u64` words
592            ///   and outlives the guard.
593            /// - For every set bit `i` in `bits`, `buf[i]` has been
594            ///   initialized with a valid `T` (and on duplicates, the
595            ///   previous `T` at `i` was dropped before overwrite).
596            /// - While the guard is alive, caller may mutate
597            ///   `buf`/`bits` (the guard holds only raw pointers; it
598            ///   does not create Rust borrows).
599            /// - On drop (when not disarmed), the guard will read
600            ///   `bits`, mask tail bits in the final word, and
601            ///   `drop_in_place` each initialized `buf[i]`—never
602            ///   accessing beyond `buf.len()` / `bits.len()`.
603            /// - If a slice is empty, the stored pointer may be
604            ///   `dangling()`; it is never dereferenced when the
605            ///   corresponding length is zero.
606            unsafe fn new(buf: &mut [MaybeUninit<T>], bits: &mut [u64]) -> Self {
607                let n_elems = buf.len();
608                let n_words = bits.len();
609                // Invariant typically: n_words == (n_elems + 63) / 64
610                // but we don't *require* it; tail is masked in Drop.
611                Self {
612                    buf: NonNull::new(buf.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
613                    bits: NonNull::new(bits.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
614                    n_elems,
615                    n_words,
616                    disarm: false,
617                }
618            }
619
620            #[inline]
621            fn disarm(&mut self) {
622                self.disarm = true;
623            }
624        }
625
626        impl<T> Drop for DropGuard<T> {
627            fn drop(&mut self) {
628                if self.disarm {
629                    return;
630                }
631
632                // SAFETY:
633                // - `self.buf` points to `n_elems` contiguous
634                //   `MaybeUninit<T>` slots (or may be dangling if
635                //   `n_elems == 0`); `self.bits` points to `n_words`
636                //   contiguous `u64` words (or may be dangling if
637                //   `n_words == 0`).
638                // - Loop bounds ensure `w < n_words` when reading
639                //   `*bits_base.add(w)`.
640                // - For the final word we mask unused tail bits so
641                //   any computed index `i = w * 64 + tz` always
642                //   satisfies `i < n_elems` before we dereference
643                //   `buf_base.add(i)`.
644                // - Only slots whose bits are set are dropped, so no
645                //   double-drops.
646                // - No aliasing with active Rust borrows: the guard
647                //   holds raw pointers and runs in `Drop` after the
648                //   fill loop.
649                unsafe {
650                    let buf_base = self.buf.as_ptr();
651                    let bits_base = self.bits.as_ptr();
652
653                    for w in 0..self.n_words {
654                        // Load word.
655                        let mut word = *bits_base.add(w);
656
657                        // Mask off bits beyond `n_elems` in the final
658                        // word (if any).
659                        if w == self.n_words.saturating_sub(1) {
660                            let used_bits = self.n_elems.saturating_sub(w * 64);
661                            if used_bits < 64 {
662                                let mask = if used_bits == 0 {
663                                    0
664                                } else {
665                                    (1u64 << used_bits) - 1
666                                };
667                                word &= mask;
668                            }
669                        }
670
671                        // Fast scan set bits.
672                        while word != 0 {
673                            let tz = word.trailing_zeros() as usize;
674                            let i = w * 64 + tz;
675                            debug_assert!(i < self.n_elems);
676
677                            let slot = buf_base.add(i);
678                            // Drop the initialized element.
679                            ptr::drop_in_place((*slot).as_mut_ptr());
680
681                            // clear the bit we just handled
682                            word &= word - 1;
683                        }
684                    }
685                }
686            }
687        }
688
689        // SAFETY:
690        // - `buf` and `bits` are freshly allocated Vecs with
691        //   capacity/len set to cover exactly `n_elems` and `n_words`,
692        //   so their `.as_mut_ptr()` is valid for that many elements.
693        // - Both slices live at least as long as the guard, and are
694        //   not moved until after the guard is disarmed.
695        // - No aliasing occurs: the guard holds only raw pointers and
696        //   the fill loop mutates through those same allocations.
697        let mut guard = unsafe { DropGuard::new(&mut buf, &mut bits) };
698
699        for (rank, value) in pairs {
700            // Single bounds check up front.
701            if rank >= guard.n_elems {
702                return Err(crate::v1::Error::InvalidRankCardinality {
703                    expected: guard.n_elems,
704                    actual: rank + 1,
705                });
706            }
707
708            // Compute word index and bit mask once.
709            let w = rank / 64;
710            let b = rank % 64;
711            let mask = 1u64 << b;
712
713            // SAFETY:
714            // - `rank < guard.n_elems` was checked above, so
715            //   `buf_slot = buf.add(rank)` is within the
716            //   `Vec<MaybeUninit<T>>` allocation.
717            // - `w = rank / 64` and `bits.len() == (n + 63) / 64`
718            //   ensure `bits_ptr = bits.add(w)` is in-bounds.
719            // - If `(word & mask) != 0`, then this slot was
720            //   previously initialized;
721            //   `drop_in_place((*buf_slot).as_mut_ptr())` is valid and
722            //   leaves the slot uninitialized.
723            // - If the bit was clear, we set it and count `filled +=
724            //   1`.
725            // - `(*buf_slot).write(value)` is valid in both cases:
726            //   either writing into an uninitialized slot or
727            //   immediately after dropping the prior `T`.
728            // - No aliasing with Rust references: the guard holds raw
729            //   pointers and we have exclusive ownership of
730            //   `buf`/`bits` within this function.
731            unsafe {
732                // Pointers from the guard (no long-lived & borrows).
733                let bits_ptr = guard.bits.as_ptr().add(w);
734                let buf_slot = guard.buf.as_ptr().add(rank);
735
736                // Read the current word.
737                let word = *bits_ptr;
738
739                if (word & mask) != 0 {
740                    // Duplicate: drop old value before overwriting.
741                    core::ptr::drop_in_place((*buf_slot).as_mut_ptr());
742                    // (Bit already set; no need to set again; don't
743                    // bump `filled`.)
744                } else {
745                    // First time we see this rank.
746                    *bits_ptr = word | mask;
747                    filled += 1;
748                }
749
750                // Write new value into the slot.
751                (*buf_slot).write(value);
752            }
753        }
754
755        if filled != n {
756            // Missing ranks: actual = number of distinct ranks seen.
757            return Err(crate::v1::Error::InvalidRankCardinality {
758                expected: n,
759                actual: filled,
760            });
761        }
762
763        // Success: prevent cleanup.
764        guard.disarm();
765
766        // SAFETY: all n slots are initialized
767        let ranks = unsafe {
768            let ptr = buf.as_mut_ptr() as *mut T;
769            let len = buf.len();
770            let cap = buf.capacity();
771            // Prevent `buf` (Vec<MaybeUninit<T>>) from freeing the
772            // allocation. Ownership of the buffer is about to be
773            // transferred to `Vec<T>` via `from_raw_parts`.
774            // Forgetting avoids a double free.
775            mem::forget(buf);
776            Vec::from_raw_parts(ptr, len, cap)
777        };
778
779        Ok(Self::new_unchecked(region, ranks))
780    }
781}
782
783impl<T: PartialEq> ValueMesh<T> {
784    /// Compresses the mesh in place using run-length encoding (RLE).
785    ///
786    /// This method scans the mesh's dense values, coalescing adjacent
787    /// runs of identical elements into a compact [`Rep::Compressed`]
788    /// representation. It replaces the internal storage (`rep`) with
789    /// the compressed form.
790    ///
791    /// # Behavior
792    /// - If the mesh is already compressed, this is a **no-op**.
793    /// - If the mesh is dense, it consumes the current `Vec<T>` and
794    ///   rebuilds the representation as a run table plus value table.
795    /// - Only *adjacent* equal values are merged; non-contiguous
796    ///   duplicates remain distinct.
797    ///
798    /// # Requirements
799    /// - `T` must implement [`PartialEq`] (to detect equal values).
800    ///
801    /// This operation is lossless: expanding the compressed mesh back
802    /// into a dense vector yields the same sequence of values.
803    pub fn compress_adjacent_in_place(&mut self) {
804        self.compress_adjacent_in_place_by(|a, b| a == b)
805    }
806}
807
808impl<T: Clone + PartialEq> ValueMesh<T> {
809    /// Materializes this mesh into a vector of `(Range<usize>, T)`
810    /// runs.
811    ///
812    /// For a dense representation, this walks the value vector and
813    /// groups adjacent equal values into contiguous runs. The result
814    /// is equivalent to what would be stored in a compressed
815    /// representation, but the mesh itself is not mutated or
816    /// re-encoded. This is purely a read-only view.
817    ///
818    /// For a compressed representation, the stored runs are simply
819    /// cloned.
820    ///
821    /// This method is intended for inspection, testing, and
822    /// diff/merge operations that need a uniform view of value runs
823    /// without changing the underlying representation.
824    fn materialized_runs(&self) -> Vec<(Range<usize>, T)> {
825        match &self.rep {
826            Rep::Dense { values } => {
827                // Coalesce adjacent equals into runs.
828                let mut out = Vec::new();
829                if values.is_empty() {
830                    return out;
831                }
832                let mut start = 0usize;
833                for i in 1..values.len() {
834                    if values[i] != values[i - 1] {
835                        out.push((start..i, values[i - 1].clone()));
836                        start = i;
837                    }
838                }
839                out.push((start..values.len(), values.last().unwrap().clone()));
840                out
841            }
842            Rep::Compressed { table, runs } => runs
843                .iter()
844                .map(|r| {
845                    let id = r.id as usize;
846                    ((r.start as usize..r.end as usize), table[id].clone())
847                })
848                .collect(),
849        }
850    }
851}
852
853impl<T: Clone + Eq> ValueMesh<T> {
854    /// Merge a sparse overlay into this mesh.
855    ///
856    /// Overlay segments are applied with **last-writer-wins**
857    /// precedence on overlap (identical to `RankedValues::merge_from`
858    /// behavior). The result is stored compressed.
859    pub fn merge_from_overlay(&mut self, overlay: ValueOverlay<T>) -> Result<(), BuildError> {
860        let n = self.region.num_ranks();
861
862        // Bounds validation (structure already validated by
863        // ValueOverlay).
864        for (r, _) in overlay.runs() {
865            if r.end > n {
866                return Err(BuildError::OutOfBounds {
867                    range: r.clone(),
868                    region_len: n,
869                });
870            }
871        }
872
873        // Left: current mesh as normalized value-bearing runs.
874        let left = self.materialized_runs();
875        // Right: overlay runs (already sorted, non-overlapping,
876        // coalesced).
877        let right: Vec<(std::ops::Range<usize>, T)> = overlay.runs().cloned().collect();
878
879        // Merge with overlay precedence, reusing the same splitting
880        // strategy as RankedValues::merge_from.
881        let merged = rle::merge_value_runs(left, right);
882
883        // Re-encode to compressed representation:
884        let (table, raw_runs) = rle::rle_from_value_runs(merged);
885        let runs = raw_runs
886            .into_iter()
887            .map(|(r, id)| Run::new(r.start, r.end, id))
888            .collect();
889        self.rep = Rep::Compressed { table, runs };
890
891        Ok(())
892    }
893}
894
895impl<T> ValueMesh<T> {
896    /// Compresses the mesh in place using a custom equivalence
897    /// predicate.
898    ///
899    /// This is a generalized form of [`compress_adjacent_in_place`]
900    /// that merges adjacent values according to an arbitrary
901    /// predicate `same(a, b)`, rather than relying on `PartialEq`.
902    ///
903    /// # Behavior
904    /// - If the mesh is already compressed, this is a **no-op**.
905    /// - Otherwise, consumes the dense `Vec<T>` and replaces it with
906    ///   a run-length encoded (`Rep::Compressed`) representation,
907    ///   where consecutive elements satisfying `same(a, b)` are
908    ///   coalesced into a single run.
909    ///
910    /// # Requirements
911    /// - The predicate must be reflexive and symmetric for
912    ///   correctness.
913    ///
914    /// This operation is lossless: expanding the compressed mesh
915    /// reproduces the original sequence exactly under the same
916    /// equivalence.
917    pub fn compress_adjacent_in_place_by<F>(&mut self, same: F)
918    where
919        F: FnMut(&T, &T) -> bool,
920    {
921        let values = match &mut self.rep {
922            Rep::Dense { values } => std::mem::take(values),
923            Rep::Compressed { .. } => return,
924        };
925        let (table, raw_runs) = rle::rle_from_dense(values, same);
926        let runs = raw_runs
927            .into_iter()
928            .map(|(r, id)| Run::new(r.start, r.end, id))
929            .collect();
930        self.rep = Rep::Compressed { table, runs };
931    }
932}
933
934/// Accumulates sparse overlay updates into an authoritative mesh.
935///
936/// Lifecycle:
937/// - Mailbox initializes `State` via `Default` (empty mesh).
938/// - On the first update, the accumulator clones `self` (the template
939///   passed to `open_accum_port_opts`) into `state`. Callers pass a
940///   template such as `StatusMesh::from_single(region, NotExist)`.
941/// - Each overlay update is merged with right-wins semantics via
942///   `merge_from_overlay`, and the accumulator emits a *full*
943///   ValueMesh.
944///
945/// The accumulator's state is a [`ValueMesh<T>`] and its updates are
946/// [`ValueOverlay<T>`] instances. On each update, the overlay’s
947/// normalized runs are merged into the mesh using
948/// [`ValueMesh::merge_from_overlay`] with right-wins semantics.
949///
950/// This enables incremental reduction of sparse updates across
951/// distributed reducers without materializing dense data.
952impl<T> Accumulator for ValueMesh<T>
953where
954    T: Eq + Clone + Named,
955{
956    type State = Self;
957    type Update = ValueOverlay<T>;
958
959    fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
960        // Mailbox starts with A::State::default() (empty). On the
961        // first update, re-initialize to our template (self), which
962        // the caller constructed as "full NotExist over the target
963        // region".
964        if state.region().num_ranks() == 0 {
965            *state = self.clone();
966        }
967
968        // Apply sparse delta into the authoritative mesh
969        // (right-wins).
970        state.merge_from_overlay(update)?;
971        Ok(())
972    }
973
974    fn reducer_spec(&self) -> Option<ReducerSpec> {
975        Some(ReducerSpec {
976            typehash: <ValueOverlayReducer<T> as Named>::typehash(),
977            builder_params: None,
978        })
979    }
980}
981
982/// Marker reducer type for [`ValueOverlay<T>`].
983///
984/// This reducer carries no state; it exists only to bind a concrete
985/// type parameter `T` to the [`CommReducer`] implementation below.
986/// Reduction is purely functional and uses right-wins merge semantics
987/// defined in [`merge_value_runs`].
988#[derive(Named)]
989struct ValueOverlayReducer<T>(std::marker::PhantomData<T>);
990
991/// Reducer for sparse overlay updates.
992///
993/// Combines two [`ValueOverlay<T>`] updates using right-wins
994/// semantics: overlapping runs from `right` overwrite those in
995/// `left`. The merged runs are then normalized and validated via
996/// [`ValueOverlay::try_from_runs`].
997///
998/// Used by the corresponding [`Accumulator`] to perform distributed
999/// reduction of incremental mesh updates.
1000impl<T> CommReducer for ValueOverlayReducer<T>
1001where
1002    T: Eq + Clone + Named,
1003{
1004    type Update = ValueOverlay<T>;
1005
1006    // Last-writer-wins merge of two sparse overlays.
1007    fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
1008        // 1) Merge runs with right precedence.
1009        let merged = crate::v1::value_mesh::rle::merge_value_runs(
1010            left.runs().cloned().collect(),
1011            right.runs().cloned().collect(),
1012        );
1013        // 2) Re-normalize to an overlay (validates, coalesces).
1014        Ok(ValueOverlay::try_from_runs(merged)?)
1015    }
1016}
1017
1018// register for concrete types:
1019
1020hyperactor::internal_macro_support::inventory::submit! {
1021    ReducerFactory {
1022        typehash_f: <ValueOverlayReducer<crate::resource::Status> as Named>::typehash,
1023        builder_f: |_| Ok(Box::new(ValueOverlayReducer::<crate::resource::Status>(PhantomData))),
1024    }
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029    use std::convert::Infallible;
1030    use std::future::Future;
1031    use std::pin::Pin;
1032    use std::task::Context;
1033    use std::task::Poll;
1034    use std::task::RawWaker;
1035    use std::task::RawWakerVTable;
1036    use std::task::Waker;
1037
1038    use futures::executor::block_on;
1039    use futures::future;
1040    use ndslice::extent;
1041    use ndslice::strategy::gen_region;
1042    use ndslice::view::CollectExactMeshExt;
1043    use ndslice::view::CollectIndexedMeshExt;
1044    use ndslice::view::CollectMeshExt;
1045    use ndslice::view::MapIntoExt;
1046    use ndslice::view::Ranked;
1047    use ndslice::view::RankedSliceable;
1048    use ndslice::view::ViewExt;
1049    use proptest::prelude::*;
1050    use proptest::strategy::ValueTree;
1051    use serde_json;
1052
1053    use super::*;
1054
1055    #[test]
1056    fn value_mesh_new_ok() {
1057        let region: Region = extent!(replica = 2, gpu = 3).into();
1058        let mesh = ValueMesh::new(region.clone(), (0..6).collect()).expect("new should succeed");
1059        assert_eq!(mesh.region().num_ranks(), 6);
1060        assert_eq!(mesh.values().count(), 6);
1061        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1062    }
1063
1064    #[test]
1065    fn value_mesh_new_len_mismatch_is_error() {
1066        let region: Region = extent!(replica = 2, gpu = 3).into();
1067        let err = ValueMesh::new(region, vec![0_i32; 5]).unwrap_err();
1068        match err {
1069            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1070                assert_eq!(expected, 6);
1071                assert_eq!(actual, 5);
1072            }
1073            other => panic!("unexpected error: {other:?}"),
1074        }
1075    }
1076
1077    #[test]
1078    fn value_mesh_transpose_ok_and_err() {
1079        let region: Region = extent!(x = 2).into();
1080
1081        // ok case
1082        let ok_mesh = ValueMesh::new(region.clone(), vec![Ok::<_, Infallible>(1), Ok(2)]).unwrap();
1083        let ok = ok_mesh.transpose().unwrap();
1084        assert_eq!(ok.values().collect::<Vec<_>>(), vec![1, 2]);
1085
1086        // err case: propagate user E
1087        #[derive(Debug, PartialEq)]
1088        enum E {
1089            Boom,
1090        }
1091        let err_mesh = ValueMesh::new(region, vec![Ok(1), Err(E::Boom)]).unwrap();
1092        let err = err_mesh.transpose().unwrap_err();
1093        assert_eq!(err, E::Boom);
1094    }
1095
1096    #[test]
1097    fn value_mesh_join_preserves_region_and_values() {
1098        let region: Region = extent!(x = 2, y = 2).into();
1099        let futs = vec![
1100            future::ready(10),
1101            future::ready(11),
1102            future::ready(12),
1103            future::ready(13),
1104        ];
1105        let mesh = ValueMesh::new(region.clone(), futs).unwrap();
1106
1107        let joined = block_on(mesh.join());
1108        assert_eq!(joined.region().num_ranks(), 4);
1109        assert_eq!(joined.values().collect::<Vec<_>>(), vec![10, 11, 12, 13]);
1110    }
1111
1112    #[test]
1113    fn collect_mesh_ok() {
1114        let region: Region = extent!(x = 2, y = 3).into();
1115        let mesh = (0..6)
1116            .collect_mesh::<ValueMesh<_>>(region.clone())
1117            .expect("collect_mesh should succeed");
1118
1119        assert_eq!(mesh.region().num_ranks(), 6);
1120        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1121    }
1122
1123    #[test]
1124    fn collect_mesh_len_too_short_is_error() {
1125        let region: Region = extent!(x = 2, y = 3).into();
1126        let err = (0..5).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
1127
1128        match err {
1129            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1130                assert_eq!(expected, 6);
1131                assert_eq!(actual, 5);
1132            }
1133            other => panic!("unexpected error: {other:?}"),
1134        }
1135    }
1136
1137    #[test]
1138    fn collect_mesh_len_too_long_is_error() {
1139        let region: Region = extent!(x = 2, y = 3).into();
1140        let err = (0..7).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
1141        match err {
1142            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1143                assert_eq!(expected, 6);
1144                assert_eq!(actual, 7);
1145            }
1146            other => panic!("unexpected error: {other:?}"),
1147        }
1148    }
1149
1150    #[test]
1151    fn collect_mesh_from_map_pipeline() {
1152        let region: Region = extent!(x = 2, y = 2).into();
1153        let mesh = (0..4)
1154            .map(|i| i * 10)
1155            .collect_mesh::<ValueMesh<_>>(region.clone())
1156            .unwrap();
1157
1158        assert_eq!(mesh.region().num_ranks(), 4);
1159        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
1160    }
1161
1162    #[test]
1163    fn collect_exact_mesh_ok() {
1164        let region: Region = extent!(x = 2, y = 3).into();
1165        let mesh = (0..6)
1166            .collect_exact_mesh::<ValueMesh<_>>(region.clone())
1167            .expect("collect_exact_mesh should succeed");
1168
1169        assert_eq!(mesh.region().num_ranks(), 6);
1170        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1171    }
1172
1173    #[test]
1174    fn collect_exact_mesh_len_too_short_is_error() {
1175        let region: Region = extent!(x = 2, y = 3).into();
1176        let err = (0..5)
1177            .collect_exact_mesh::<ValueMesh<_>>(region)
1178            .unwrap_err();
1179
1180        match err {
1181            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1182                assert_eq!(expected, 6);
1183                assert_eq!(actual, 5);
1184            }
1185            other => panic!("unexpected error: {other:?}"),
1186        }
1187    }
1188
1189    #[test]
1190    fn collect_exact_mesh_len_too_long_is_error() {
1191        let region: Region = extent!(x = 2, y = 3).into();
1192        let err = (0..7)
1193            .collect_exact_mesh::<ValueMesh<_>>(region)
1194            .unwrap_err();
1195
1196        match err {
1197            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1198                assert_eq!(expected, 6);
1199                assert_eq!(actual, 7);
1200            }
1201            other => panic!("unexpected error: {other:?}"),
1202        }
1203    }
1204
1205    #[test]
1206    fn collect_exact_mesh_from_map_pipeline() {
1207        let region: Region = extent!(x = 2, y = 2).into();
1208        let mesh = (0..4)
1209            .map(|i| i * 10)
1210            .collect_exact_mesh::<ValueMesh<_>>(region.clone())
1211            .unwrap();
1212
1213        assert_eq!(mesh.region().num_ranks(), 4);
1214        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
1215    }
1216
1217    #[test]
1218    fn collect_indexed_ok_shuffled() {
1219        let region: Region = extent!(x = 2, y = 3).into();
1220        // (rank, value) in shuffled order; values = rank * 10
1221        let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)];
1222        let mesh = pairs
1223            .into_iter()
1224            .collect_indexed::<ValueMesh<_>>(region.clone())
1225            .unwrap();
1226
1227        assert_eq!(mesh.region().num_ranks(), 6);
1228        assert_eq!(
1229            mesh.values().collect::<Vec<_>>(),
1230            vec![0, 10, 20, 30, 40, 50]
1231        );
1232    }
1233
1234    #[test]
1235    fn collect_indexed_missing_rank_is_error() {
1236        let region: Region = extent!(x = 2, y = 2).into(); // 4
1237        // Missing rank 3
1238        let pairs = vec![(0, 100), (1, 101), (2, 102)];
1239        let err = pairs
1240            .into_iter()
1241            .collect_indexed::<ValueMesh<_>>(region)
1242            .unwrap_err();
1243
1244        match err {
1245            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1246                assert_eq!(expected, 4);
1247                assert_eq!(actual, 3); // Distinct ranks seen.
1248            }
1249            other => panic!("unexpected error: {other:?}"),
1250        }
1251    }
1252
1253    #[test]
1254    fn collect_indexed_out_of_bounds_is_error() {
1255        let region: Region = extent!(x = 2, y = 2).into(); // 4 (valid ranks 0..=3)
1256        let pairs = vec![(0, 1), (4, 9)]; // 4 is out-of-bounds
1257        let err = pairs
1258            .into_iter()
1259            .collect_indexed::<ValueMesh<_>>(region)
1260            .unwrap_err();
1261
1262        match err {
1263            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1264                assert_eq!(expected, 4);
1265                assert_eq!(actual, 5); // offending index + 1
1266            }
1267            other => panic!("unexpected error: {other:?}"),
1268        }
1269    }
1270
1271    #[test]
1272    fn collect_indexed_duplicate_last_write_wins() {
1273        let region: Region = extent!(x = 1, y = 3).into(); // 3
1274        // rank 1 appears twice; last value should stick
1275        let pairs = vec![(0, 7), (1, 8), (1, 88), (2, 9)];
1276        let mesh = pairs
1277            .into_iter()
1278            .collect_indexed::<ValueMesh<_>>(region.clone())
1279            .unwrap();
1280
1281        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![7, 88, 9]);
1282    }
1283
1284    // Indexed collector naïve implementation (for reference).
1285    #[allow(clippy::result_large_err)]
1286    fn build_value_mesh_indexed<T>(
1287        region: Region,
1288        pairs: impl IntoIterator<Item = (usize, T)>,
1289    ) -> crate::v1::Result<ValueMesh<T>> {
1290        let n = region.num_ranks();
1291
1292        // Buffer for exactly n slots; fill by rank.
1293        let mut buf: Vec<Option<T>> = std::iter::repeat_with(|| None).take(n).collect();
1294        let mut filled = 0usize;
1295
1296        for (rank, value) in pairs {
1297            if rank >= n {
1298                // Out-of-bounds: report `expected` = n, `actual` =
1299                // offending index + 1; i.e. number of ranks implied
1300                // so far.
1301                return Err(crate::v1::Error::InvalidRankCardinality {
1302                    expected: n,
1303                    actual: rank + 1,
1304                });
1305            }
1306            if buf[rank].is_none() {
1307                filled += 1;
1308            }
1309            buf[rank] = Some(value); // Last write wins.
1310        }
1311
1312        if filled != n {
1313            // Missing ranks: actual = number of distinct ranks seen.
1314            return Err(crate::v1::Error::InvalidRankCardinality {
1315                expected: n,
1316                actual: filled,
1317            });
1318        }
1319
1320        // All present and in-bounds: unwrap and build unchecked.
1321        let ranks: Vec<T> = buf.into_iter().map(Option::unwrap).collect();
1322        Ok(ValueMesh::new_unchecked(region, ranks))
1323    }
1324
1325    /// This uses the bit-mixing portion of Sebastiano Vigna's
1326    /// [SplitMix64 algorithm](https://prng.di.unimi.it/splitmix64.c)
1327    /// to generate a high-quality 64-bit hash from a usize index.
1328    /// Unlike the full SplitMix64 generator, this is stateless - we
1329    /// accept an arbitrary x as input and apply the mix function to
1330    /// turn `x` deterministically into a "randomized" u64. input
1331    /// always produces the same output.
1332    fn hash_key(x: usize) -> u64 {
1333        let mut z = x as u64 ^ 0x9E3779B97F4A7C15;
1334        z = (z ^ (z >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
1335        z = (z ^ (z >> 27)).wrapping_mul(0x94D049BB133111EB);
1336        z ^ (z >> 31)
1337    }
1338
1339    /// Shuffle a slice deterministically, using a hash of indices as
1340    /// the key.
1341    ///
1342    /// Each position `i` is assigned a pseudo-random 64-bit key (from
1343    /// `key(i)`), the slice is sorted by those keys, and the
1344    /// resulting permutation is applied in place.
1345    ///
1346    /// The permutation is fully determined by the sequence of indices
1347    /// `0..n` and the chosen `key` function. Running it twice on the
1348    /// same input yields the same "random-looking" arrangement.
1349    ///
1350    /// This is going to be used (below) for property tests: it gives
1351    /// the effect of a shuffle without introducing global RNG state,
1352    /// and ensures that duplicate elements are still ordered
1353    /// consistently (so we can test "last write wins" semantics in
1354    /// collectors).
1355    fn pseudo_shuffle<'a, T: 'a>(v: &'a mut [T], key: impl Fn(usize) -> u64 + Copy) {
1356        // Build perm.
1357        let mut with_keys: Vec<(u64, usize)> = (0..v.len()).map(|i| (key(i), i)).collect();
1358        with_keys.sort_by_key(|&(k, _)| k);
1359        let perm: Vec<usize> = with_keys.into_iter().map(|(_, i)| i).collect();
1360
1361        // In-place permutation using a cycle based approach (e.g.
1362        // https://www.geeksforgeeks.org/dsa/permute-the-elements-of-an-array-following-given-order/).
1363        let mut seen = vec![false; v.len()];
1364        for i in 0..v.len() {
1365            if seen[i] {
1366                continue;
1367            }
1368            let mut a = i;
1369            while !seen[a] {
1370                seen[a] = true;
1371                let b = perm[a];
1372                // Short circuit on the cycle's start index.
1373                if b == i {
1374                    break;
1375                }
1376                v.swap(a, b);
1377                a = b;
1378            }
1379        }
1380    }
1381
1382    // Property: Optimized and reference collectors yield the same
1383    // `ValueMesh` on complete inputs, even with duplicates.
1384    //
1385    // - Begin with a complete set of `(rank, value)` pairs covering
1386    //   all ranks of the region.
1387    // - Add extra pairs at arbitrary ranks (up to `extra_len`), which
1388    //   necessarily duplicate existing entries when `extra_len > 0`.
1389    // - Shuffle the combined pairs deterministically.
1390    // - Collect using both the reference (`try_collect_indexed`) and
1391    //   optimized (`try_collect_indexed_opt`) implementations.
1392    //
1393    // Both collectors must succeed and produce identical results.
1394    // This demonstrates that the optimized version preserves
1395    // last-write-wins semantics and agrees exactly with the reference
1396    // behavior.
1397    proptest! {
1398        #[test]
1399        fn try_collect_opt_equivalence(region in gen_region(1..=4, 6), extra_len in 0usize..=12) {
1400            let n = region.num_ranks();
1401
1402            // Start with one pair per rank (coverage guaranteed).
1403            let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
1404
1405            // Add some extra duplicates of random in-bounds ranks.
1406            // Their values differ so last-write-wins is observable.
1407            let extras = proptest::collection::vec(0..n, extra_len)
1408                .new_tree(&mut proptest::test_runner::TestRunner::default())
1409                .unwrap()
1410                .current();
1411            for (k, r) in extras.into_iter().enumerate() {
1412                pairs.push((r, (n as i64) + (k as i64)));
1413            }
1414
1415            // Deterministic "shuffle" to fix iteration order across
1416            // both collectors.
1417            pseudo_shuffle(&mut pairs, hash_key);
1418
1419            // Reference vs optimized.
1420            let mesh_ref = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap();
1421            let mesh_opt = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region.clone()).unwrap();
1422
1423            prop_assert_eq!(mesh_ref.region(), mesh_opt.region());
1424            prop_assert_eq!(mesh_ref.values().collect::<Vec<_>>(), mesh_opt.values().collect::<Vec<_>>());
1425        }
1426    }
1427
1428    // Property: Optimized and reference collectors report identical
1429    // errors when ranks are missing.
1430    //
1431    // - Begin with a complete set of `(rank, value)` pairs.
1432    // - Remove one rank so coverage is incomplete.
1433    // - Shuffle deterministically.
1434    // - Collect with both implementations.
1435    //
1436    // Both must fail with `InvalidRankCardinality` describing the
1437    // same expected vs. actual counts.
1438    proptest! {
1439        #[test]
1440        fn try_collect_opt_missing_rank_errors_match(region in gen_region(1..=4, 6)) {
1441            let n = region.num_ranks();
1442            // Base complete.
1443            let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
1444            // Drop one distinct rank.
1445            if n > 0 {
1446                let drop_idx = 0usize; // Deterministic, fine for the property.
1447                pairs.remove(drop_idx);
1448            }
1449            // Shuffle deterministically.
1450            pseudo_shuffle(&mut pairs, hash_key);
1451
1452            let ref_err  = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
1453            let opt_err  = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
1454            assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
1455        }
1456    }
1457
1458    // Property: Optimized and reference collectors report identical
1459    // errors when given out-of-bounds ranks.
1460    //
1461    // - Construct a set of `(rank, value)` pairs.
1462    // - Include at least one pair whose rank is ≥
1463    //   `region.num_ranks()`.
1464    // - Shuffle deterministically.
1465    // - Collect with both implementations.
1466    //
1467    // Both must fail with `InvalidRankCardinality`, and the reported
1468    // error values must match exactly.
1469    proptest! {
1470        #[test]
1471        fn try_collect_opt_out_of_bound_errors_match(region in gen_region(1..=4, 6)) {
1472            let n = region.num_ranks();
1473            // One valid, then one out-of-bound.
1474            let mut pairs = vec![(0usize, 0i64), (n, 123i64)];
1475            pseudo_shuffle(&mut pairs, hash_key);
1476
1477            let ref_err = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
1478            let opt_err = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
1479            assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
1480        }
1481    }
1482
1483    #[test]
1484    fn map_into_preserves_region_and_order() {
1485        let region: Region = extent!(rows = 2, cols = 3).into();
1486        let vm = ValueMesh::new_unchecked(region.clone(), vec![0, 1, 2, 3, 4, 5]);
1487
1488        let doubled: ValueMesh<_> = vm.map_into(|x| x * 2);
1489        assert_eq!(doubled.region, region);
1490        assert_eq!(
1491            doubled.values().collect::<Vec<_>>(),
1492            vec![0, 2, 4, 6, 8, 10]
1493        );
1494    }
1495
1496    #[test]
1497    fn map_into_ref_borrows_and_preserves() {
1498        let region: Region = extent!(n = 4).into();
1499        let vm = ValueMesh::new_unchecked(
1500            region.clone(),
1501            vec!["a".to_string(), "b".into(), "c".into(), "d".into()],
1502        );
1503
1504        let lens: ValueMesh<_> = vm.map_into(|s| s.len());
1505        assert_eq!(lens.region, region);
1506        assert_eq!(lens.values().collect::<Vec<_>>(), vec![1, 1, 1, 1]);
1507    }
1508
1509    #[test]
1510    fn try_map_into_short_circuits_on_error() {
1511        let region = extent!(n = 4).into();
1512        let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
1513
1514        let res: Result<ValueMesh<i32>, &'static str> =
1515            vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
1516
1517        assert!(res.is_err());
1518        assert_eq!(res.unwrap_err(), "boom");
1519    }
1520
1521    #[test]
1522    fn try_map_into_ref_short_circuits_on_error() {
1523        let region = extent!(n = 4).into();
1524        let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
1525
1526        let res: Result<ValueMesh<i32>, &'static str> =
1527            vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
1528
1529        assert!(res.is_err());
1530        assert_eq!(res.unwrap_err(), "boom");
1531    }
1532
1533    // -- Helper to poll `core::future::Ready` without a runtime
1534    fn noop_waker() -> Waker {
1535        fn clone(_: *const ()) -> RawWaker {
1536            RawWaker::new(std::ptr::null(), &VTABLE)
1537        }
1538        fn wake(_: *const ()) {}
1539        fn wake_by_ref(_: *const ()) {}
1540        fn drop(_: *const ()) {}
1541        static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
1542        // SAFETY: The raw waker never dereferences its data pointer
1543        // (`null`), and all vtable fns are no-ops. It's only used to
1544        // satisfy `Context` for polling already-ready futures in
1545        // tests.
1546        unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
1547    }
1548
1549    fn poll_now<F: Future>(mut fut: F) -> F::Output {
1550        let waker = noop_waker();
1551        let mut cx = Context::from_waker(&waker);
1552        // SAFETY: `fut` is a local stack variable that we never move
1553        // after pinning, and we only use it to poll immediately
1554        // within this scope. This satisfies the invariants of
1555        // `Pin::new_unchecked`.
1556        let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
1557        match fut.as_mut().poll(&mut cx) {
1558            Poll::Ready(v) => v,
1559            Poll::Pending => unreachable!("Ready futures must complete immediately"),
1560        }
1561    }
1562    // --
1563
1564    #[test]
1565    fn map_into_ready_futures() {
1566        let region: Region = extent!(r = 2, c = 2).into();
1567        let vm = ValueMesh::new_unchecked(region.clone(), vec![10, 20, 30, 40]);
1568
1569        // Map to `core::future::Ready` futures.
1570        let pending: ValueMesh<core::future::Ready<_>> =
1571            vm.map_into(|x| core::future::ready(x + 1));
1572        assert_eq!(pending.region, region);
1573
1574        // Drive the ready futures without a runtime and collect results.
1575        let results: Vec<_> = pending.values().map(|f| poll_now(f.clone())).collect();
1576        assert_eq!(results, vec![11, 21, 31, 41]);
1577    }
1578
1579    #[test]
1580    fn map_into_single_element_mesh() {
1581        let region: Region = extent!(n = 1).into();
1582        let vm = ValueMesh::new_unchecked(region.clone(), vec![7]);
1583
1584        let out: ValueMesh<_> = vm.map_into(|x| x * x);
1585        assert_eq!(out.region, region);
1586        assert_eq!(out.values().collect::<Vec<_>>(), vec![49]);
1587    }
1588
1589    #[test]
1590    fn map_into_ref_with_non_clone_field() {
1591        // A type that intentionally does NOT implement Clone.
1592        #[derive(Debug, PartialEq, Eq)]
1593        struct NotClone(i32);
1594
1595        let region: Region = extent!(x = 3).into();
1596        let values = vec![(10, NotClone(1)), (20, NotClone(2)), (30, NotClone(3))];
1597        let mesh: ValueMesh<(i32, NotClone)> =
1598            values.into_iter().collect_mesh(region.clone()).unwrap();
1599
1600        let projected: ValueMesh<i32> = mesh.map_into(|t| t.0);
1601        assert_eq!(projected.values().collect::<Vec<_>>(), vec![10, 20, 30]);
1602        assert_eq!(projected.region(), &region);
1603    }
1604
1605    #[test]
1606    fn rle_roundtrip_all_equal() {
1607        let region: Region = extent!(n = 6).into();
1608        let mut vm = ValueMesh::new_unchecked(region.clone(), vec![42; 6]);
1609
1610        // Compress and ensure logical equality preserved.
1611        vm.compress_adjacent_in_place();
1612        let collected: Vec<_> = vm.values().collect();
1613        assert_eq!(collected, vec![42, 42, 42, 42, 42, 42]);
1614
1615        // Random access still works.
1616        for i in 0..region.num_ranks() {
1617            assert_eq!(vm.get(i), Some(&42));
1618        }
1619        assert_eq!(vm.get(region.num_ranks()), None); // out-of-bounds
1620    }
1621
1622    #[test]
1623    fn rle_roundtrip_alternating() {
1624        let region: Region = extent!(n = 6).into();
1625        let original = vec![1, 2, 1, 2, 1, 2];
1626        let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1627
1628        vm.compress_adjacent_in_place();
1629        let collected: Vec<_> = vm.values().collect();
1630        assert_eq!(collected, original);
1631
1632        // Spot-check random access after compression.
1633        assert_eq!(vm.get(0), Some(&1));
1634        assert_eq!(vm.get(1), Some(&2));
1635        assert_eq!(vm.get(3), Some(&2));
1636        assert_eq!(vm.get(5), Some(&2));
1637    }
1638
1639    #[test]
1640    fn rle_roundtrip_blocky_and_slice() {
1641        // Blocks: 0,0,0 | 1,1 | 2,2,2,2 | 3
1642        let region: Region = extent!(n = 10).into();
1643        let original = vec![0, 0, 0, 1, 1, 2, 2, 2, 2, 3];
1644        let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1645
1646        vm.compress_adjacent_in_place();
1647        let collected: Vec<_> = vm.values().collect();
1648        assert_eq!(collected, original);
1649
1650        // Slice a middle subregion [3..8) → [1,1,2,2,2]
1651        let sub_region = region.range("n", 3..8).unwrap();
1652        let sliced = vm.sliced(sub_region);
1653        let sliced_vec: Vec<_> = sliced.values().collect();
1654        assert_eq!(sliced_vec, vec![1, 1, 2, 2, 2]);
1655    }
1656
1657    #[test]
1658    fn rle_idempotent_noop_on_second_call() {
1659        let region: Region = extent!(n = 7).into();
1660        let original = vec![9, 9, 9, 8, 8, 9, 9];
1661        let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1662
1663        vm.compress_adjacent_in_place();
1664        let once: Vec<_> = vm.values().collect();
1665        assert_eq!(once, original);
1666
1667        // Calling again should be a no-op and still yield identical
1668        // values.
1669        vm.compress_adjacent_in_place();
1670        let twice: Vec<_> = vm.values().collect();
1671        assert_eq!(twice, original);
1672    }
1673
1674    #[test]
1675    fn rle_works_after_build_indexed() {
1676        // Build with shuffled pairs, then compress and verify
1677        // semantics.
1678        let region: Region = extent!(x = 2, y = 3).into(); // 6
1679        let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)];
1680        let mut vm = pairs
1681            .into_iter()
1682            .collect_indexed::<ValueMesh<_>>(region.clone())
1683            .unwrap();
1684
1685        // Should compress to 6 runs of length 1; still must
1686        // round-trip.
1687        vm.compress_adjacent_in_place();
1688        let collected: Vec<_> = vm.values().collect();
1689        assert_eq!(collected, vec![0, 10, 20, 30, 40, 50]);
1690        // Spot-check get()
1691        assert_eq!(vm.get(4), Some(&40));
1692    }
1693
1694    #[test]
1695    fn rle_handles_singleton_mesh() {
1696        let region: Region = extent!(n = 1).into();
1697        let mut vm = ValueMesh::new_unchecked(region.clone(), vec![123]);
1698
1699        vm.compress_adjacent_in_place();
1700        let collected: Vec<_> = vm.values().collect();
1701        assert_eq!(collected, vec![123]);
1702        assert_eq!(vm.get(0), Some(&123));
1703        assert_eq!(vm.get(1), None);
1704    }
1705
1706    #[test]
1707    fn test_dense_round_trip_json() {
1708        // Build a simple dense mesh of 5 integers.
1709        let region: Region = extent!(x = 5).into();
1710        let dense = ValueMesh::new(region.clone(), vec![1, 2, 3, 4, 5]).unwrap();
1711
1712        let json = serde_json::to_string_pretty(&dense).unwrap();
1713        let restored: ValueMesh<i32> = serde_json::from_str(&json).unwrap();
1714
1715        assert_eq!(dense, restored);
1716
1717        // Dense meshes should stay dense on the wire: check the
1718        // tagged variant.
1719        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1720        // enum tag is nested: {"rep": {"Dense":{...}}
1721        let tag = v.get("rep").and_then(|o| o.get("Dense"));
1722        assert!(tag.is_some(), "json is {}", json);
1723    }
1724
1725    #[test]
1726    fn test_dense_round_trip_bincode() {
1727        // Build a simple dense mesh of 5 integers.
1728        let region: Region = extent!(x = 5).into();
1729        let dense = ValueMesh::new(region.clone(), vec![1, 2, 3, 4, 5]).unwrap();
1730
1731        let encoded = bincode::serialize(&dense).unwrap();
1732        let restored: ValueMesh<i32> = bincode::deserialize(&encoded).unwrap();
1733
1734        assert_eq!(dense, restored);
1735        // Dense meshes should stay dense on the wire.
1736        assert!(matches!(restored.rep, Rep::Dense { .. }));
1737    }
1738
1739    #[test]
1740    fn test_compressed_round_trip_json() {
1741        // Build a dense mesh, compress it, and verify it stays
1742        // compressed on the wire.
1743        let region: Region = extent!(x = 10).into();
1744        let mut mesh = ValueMesh::new(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 3, 3, 3]).unwrap();
1745        mesh.compress_adjacent_in_place();
1746
1747        let json = serde_json::to_string_pretty(&mesh).unwrap();
1748        let restored: ValueMesh<i32> = serde_json::from_str(&json).unwrap();
1749
1750        // Logical equality preserved.
1751        assert_eq!(mesh, restored);
1752
1753        // Compressed meshes should stay compressed on the wire.
1754        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1755        // enum tag is nested: {"rep": {"Compressed": {...}}
1756        let tag = v.get("rep").and_then(|o| o.get("Compressed"));
1757        assert!(tag.is_some(), "json is {}", json);
1758    }
1759
1760    #[test]
1761    fn test_compressed_round_trip_bincode() {
1762        // Build a dense mesh, compress it, and verify it stays
1763        // compressed on the wire.
1764        let region: Region = extent!(x = 10).into();
1765        let mut mesh = ValueMesh::new(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 3, 3, 3]).unwrap();
1766        mesh.compress_adjacent_in_place();
1767
1768        let encoded = bincode::serialize(&mesh).unwrap();
1769        let restored: ValueMesh<i32> = bincode::deserialize(&encoded).unwrap();
1770
1771        // Logical equality preserved.
1772        assert_eq!(mesh, restored);
1773        // Compressed meshes should stay compressed on the wire.
1774        assert!(matches!(restored.rep, Rep::Compressed { .. }));
1775    }
1776
1777    #[test]
1778    fn test_stable_run_encoding() {
1779        let run = Run::new(0, 10, 42);
1780        let json = serde_json::to_string(&run).unwrap();
1781        let decoded: Run = serde_json::from_str(&json).unwrap();
1782
1783        assert_eq!(run, decoded);
1784        assert_eq!(run.start, 0);
1785        assert_eq!(run.end, 10);
1786        assert_eq!(run.id, 42);
1787
1788        // Ensure conversion back to Range<usize> works.
1789        let (range, id): (Range<usize>, u32) = run.try_into().unwrap();
1790        assert_eq!(range, 0..10);
1791        assert_eq!(id, 42);
1792    }
1793
1794    #[test]
1795    fn from_single_builds_single_run() {
1796        let region: Region = extent!(n = 6).into();
1797        let vm = ValueMesh::from_single(region.clone(), 7);
1798
1799        assert_eq!(vm.region(), &region);
1800        assert_eq!(vm.values().collect::<Vec<_>>(), vec![7, 7, 7, 7, 7, 7]);
1801        assert_eq!(vm.get(0), Some(&7));
1802        assert_eq!(vm.get(5), Some(&7));
1803        assert_eq!(vm.get(6), None);
1804    }
1805
1806    #[test]
1807    fn from_default_builds_with_default_value() {
1808        let region: Region = extent!(n = 6).into();
1809        let vm = ValueMesh::<i32>::from_default(region.clone());
1810
1811        assert_eq!(vm.region(), &region);
1812        // i32::default() == 0
1813        assert_eq!(vm.values().collect::<Vec<_>>(), vec![0, 0, 0, 0, 0, 0]);
1814        assert_eq!(vm.get(0), Some(&0));
1815        assert_eq!(vm.get(5), Some(&0));
1816    }
1817
1818    #[test]
1819    fn test_default_vs_single_equivalence() {
1820        let region: Region = extent!(x = 4).into();
1821        let d1 = ValueMesh::<i32>::from_default(region.clone());
1822        let d2 = ValueMesh::from_single(region.clone(), 0);
1823        assert_eq!(d1, d2);
1824    }
1825
1826    #[test]
1827    fn build_from_ranges_with_default_basic() {
1828        let region: Region = extent!(n = 10).into();
1829        let vm = ValueMesh::from_ranges_with_default(
1830            region.clone(),
1831            0, // default
1832            vec![(2..4, 1), (6..9, 2)],
1833        )
1834        .unwrap();
1835
1836        assert_eq!(vm.region(), &region);
1837        assert_eq!(
1838            vm.values().collect::<Vec<_>>(),
1839            vec![0, 0, 1, 1, 0, 0, 2, 2, 2, 0]
1840        );
1841
1842        // Internal shape: [0..2)->0, [2..4)->1, [4..6)->0, [6..9)->2,
1843        // [9..10)->0
1844        if let Rep::Compressed { table, runs } = &vm.rep {
1845            // Table is small and de-duplicated.
1846            assert!(table.len() <= 3);
1847            assert_eq!(runs.len(), 5);
1848        } else {
1849            panic!("expected compressed");
1850        }
1851    }
1852
1853    #[test]
1854    fn build_from_ranges_with_default_edge_cases() {
1855        let region: Region = extent!(n = 5).into();
1856
1857        // Full override covers entire region.
1858        let vm = ValueMesh::from_ranges_with_default(region.clone(), 9, vec![(0..5, 3)]).unwrap();
1859        assert_eq!(vm.values().collect::<Vec<_>>(), vec![3, 3, 3, 3, 3]);
1860
1861        // Adjacent overrides and default gaps.
1862        let vm = ValueMesh::from_ranges_with_default(region.clone(), 0, vec![(1..2, 7), (2..4, 7)])
1863            .unwrap();
1864        assert_eq!(vm.values().collect::<Vec<_>>(), vec![0, 7, 7, 7, 0]);
1865
1866        // Empty region.
1867        let empty_region: Region = extent!(n = 0).into();
1868        let vm = ValueMesh::from_ranges_with_default(empty_region.clone(), 42, vec![]).unwrap();
1869        assert_eq!(vm.values().collect::<Vec<_>>(), Vec::<i32>::new());
1870    }
1871
1872    #[test]
1873    fn from_dense_builds_and_compresses() {
1874        let region: Region = extent!(n = 6).into();
1875        let mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 2, 2, 3, 3]).unwrap();
1876
1877        assert_eq!(mesh.region(), &region);
1878        assert!(matches!(mesh.rep, Rep::Compressed { .. }));
1879        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![1, 1, 2, 2, 3, 3]);
1880
1881        // Spot-check indexing.
1882        assert_eq!(mesh.get(0), Some(&1));
1883        assert_eq!(mesh.get(3), Some(&2));
1884        assert_eq!(mesh.get(5), Some(&3));
1885    }
1886
1887    #[test]
1888    fn merge_from_overlay_basic() {
1889        // Base mesh with two contiguous runs.
1890        let region: Region = extent!(n = 8).into();
1891        let mut mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3]).unwrap();
1892
1893        // Overlay replaces middle segment [2..6) with 9s.
1894        let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9)]).unwrap();
1895
1896        mesh.merge_from_overlay(overlay).unwrap();
1897
1898        // Materialize back into ranges to inspect.
1899        let out = mesh.materialized_runs();
1900
1901        // Expected: left prefix (0..2)=1, replaced middle (2..6)=9, tail (6..8)=3.
1902        assert_eq!(out, vec![(0..2, 1), (2..6, 9), (6..8, 3)]);
1903    }
1904
1905    #[test]
1906    fn merge_from_overlay_multiple_spans() {
1907        // Build mesh with alternating runs.
1908        let region: Region = extent!(m = 12).into();
1909        let mut mesh =
1910            ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4])
1911                .unwrap();
1912
1913        // Overlay has a run that spans across the boundary of two
1914        // left runs and another disjoint run later.
1915        let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9), (9..11, 8)]).unwrap();
1916
1917        mesh.merge_from_overlay(overlay).unwrap();
1918        let out = mesh.materialized_runs();
1919
1920        // Expected after merge and re-compression:
1921        // (0..2,1) untouched
1922        // (2..6,9) overwrite of part of [1,2] runs
1923        // (6..9,3) left tail survives
1924        // (9..11,8) overwrite inside [4] run
1925        // (11..12,4) leftover tail
1926        assert_eq!(
1927            out,
1928            vec![(0..2, 1), (2..6, 9), (6..9, 3), (9..11, 8), (11..12, 4)]
1929        );
1930    }
1931
1932    #[test]
1933    fn merge_from_overlay_crosses_row_boundary() {
1934        // 2 x 5 region -> 10 linear ranks in row-major order.
1935        let region: Region = extent!(rows = 2, cols = 5).into();
1936
1937        // Dense values laid out row-major:
1938        // row 0: [1, 1, 1, 2, 2]
1939        // row 1: [3, 3, 4, 4, 4]
1940        let mut mesh =
1941            ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 4, 4, 4]).unwrap();
1942
1943        // Overlay that crosses the row boundary:
1944        // linear ranks [3..7) -> 9
1945        //   - tail of row 0: indices 3,4 (the two 2s)
1946        //   - head of row 1: indices 5,6 (the two 3s)
1947        let overlay = ValueOverlay::try_from_runs(vec![(3..7, 9)]).unwrap();
1948
1949        mesh.merge_from_overlay(overlay).unwrap();
1950
1951        // After merge, the dense view should be:
1952        // [1,1,1, 9,9, 9,9, 4,4,4]
1953        let flat: Vec<_> = mesh.values().collect();
1954        assert_eq!(flat, vec![1, 1, 1, 9, 9, 9, 9, 4, 4, 4]);
1955
1956        // And the materialized runs should reflect that:
1957        // (0..3,1) | (3..7,9) | (7..10,4)
1958        let runs = mesh.materialized_runs();
1959        assert_eq!(runs, vec![(0..3, 1), (3..7, 9), (7..10, 4)]);
1960    }
1961}