hyperactor_mesh/
value_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! ## Value mesh invariants (VM-*)
10//!
11//! - **VM-1 (completeness):** Every rank in `region` has exactly one
12//!   value. Iteration and indexing follow the region's linearization.
13
14use std::cmp::Ordering;
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::hash::Hash;
18use std::marker::PhantomData;
19use std::mem;
20use std::mem::MaybeUninit;
21use std::ops::Range;
22use std::ptr;
23use std::ptr::NonNull;
24
25use futures::Future;
26use hyperactor::accum::Accumulator;
27use hyperactor::accum::CommReducer;
28use hyperactor::accum::ReducerFactory;
29use hyperactor::accum::ReducerSpec;
30use ndslice::extent;
31use ndslice::view;
32use ndslice::view::Ranked;
33use ndslice::view::Region;
34use serde::Deserialize;
35use serde::Serialize;
36use typeuri::Named;
37
38mod rle;
39mod value_overlay;
40pub use value_overlay::BuildError;
41pub use value_overlay::ValueOverlay;
42
43/// A mesh of values, one per rank in `region`.
44///
45/// The internal representation (`rep`) may be dense or compressed,
46/// but externally the mesh always behaves as a complete mapping from
47/// rank index → value.
48///
49/// See VM-1 in module doc.
50#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements
51pub struct ValueMesh<T> {
52    /// The logical multidimensional domain of the mesh.
53    ///
54    /// Determines the number of ranks (`region.num_ranks()`) and the
55    /// order in which they are traversed.
56    region: Region,
57
58    /// The underlying storage representation.
59    ///
60    /// - `Rep::Dense` stores a `Vec<T>` with one value per rank.
61    /// - `Rep::Compressed` stores a run-length encoded table of
62    ///   unique values plus `(Range<usize>, u32)` pairs describing
63    ///   contiguous runs of identical values.
64    ///
65    /// The representation is an internal optimization detail; all
66    /// public APIs (e.g. `values()`, `get()`, slicing) behave as if
67    /// the mesh were dense.
68    rep: Rep<T>,
69}
70
71/// A single run-length–encoded (RLE) segment within a [`ValueMesh`].
72///
73/// Each `Run` represents a contiguous range of ranks `[start, end)`
74/// that all share the same value, referenced indirectly via a table
75/// index `id`. This allows compact storage of large regions with
76/// repeated values.
77///
78/// Runs are serialized in a stable, portable format using `u64` for
79/// range bounds (`start`, `end`) to avoid platform‐dependent `usize`
80/// encoding differences.
81#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
82struct Run {
83    /// Inclusive start of the contiguous range of ranks (0-based).
84    start: u64,
85    /// Exclusive end of the contiguous range of ranks (0-based).
86    end: u64,
87    /// Index into the value table for this run's shared value.
88    id: u32,
89}
90
91impl Run {
92    /// Creates a new `Run` covering ranks `[start, end)` that all
93    /// share the same table entry `id`.
94    ///
95    /// Converts `usize` bounds to `u64` for stable serialization.
96    fn new(start: usize, end: usize, id: u32) -> Self {
97        Self {
98            start: start as u64,
99            end: end as u64,
100            id,
101        }
102    }
103}
104
105impl TryFrom<Run> for (Range<usize>, u32) {
106    type Error = &'static str;
107
108    /// Converts a serialized [`Run`] back into its in-memory form
109    /// `(Range<usize>, u32)`.
110    ///
111    /// Performs checked conversion of the 64-bit wire fields back
112    /// into `usize` indices, returning an error if either bound
113    /// exceeds the platform's addressable range. This ensures safe
114    /// round-tripping between the serialized wire format and native
115    /// representation.
116    #[allow(clippy::result_large_err)]
117    fn try_from(r: Run) -> Result<Self, Self::Error> {
118        let start = usize::try_from(r.start).map_err(|_| "run.start too large")?;
119        let end = usize::try_from(r.end).map_err(|_| "run.end too large")?;
120        Ok((start..end, r.id))
121    }
122}
123
124/// Internal storage representation for a [`ValueMesh`].
125///
126/// This enum abstracts how the per-rank values are stored.
127/// Externally, both variants behave identically — the difference is
128/// purely in memory layout and access strategy.
129///
130/// - [`Rep::Dense`] stores one value per rank, directly.
131/// - [`Rep::Compressed`] stores a compact run-length-encoded form,
132///   reusing identical values across contiguous ranks.
133///
134/// Users of [`ValueMesh`] normally never interact with `Rep`
135/// directly; all iteration and slicing APIs present a dense logical
136/// view.
137#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements
138enum Rep<T> {
139    /// Fully expanded representation: one element per rank.
140    ///
141    /// The length of `values` is always equal to
142    /// `region.num_ranks()`. This form is simple and fast for
143    /// iteration and mutation but uses more memory when large runs of
144    /// repeated values are present.
145    Dense {
146        /// Flat list of values, one per rank in the region's
147        /// linearization order.
148        values: Vec<T>,
149    },
150
151    /// Run-length-encoded representation.
152    ///
153    /// Each run `(Range<usize>, id)` indicates that the ranks within
154    /// `Range` (half-open `[start, end)`) share the same value at
155    /// `table[id]`. The `table` stores each distinct value once.
156    ///
157    /// # Invariants (VM-2)
158    /// - **VM-2 (runs-contiguous):** Runs are non-empty and contiguous (`r.start < r.end`).
159    /// - Runs collectively cover `0..region.num_ranks()` with no gaps
160    ///   or overlaps.
161    /// - `id` indexes into `table` (`id < table.len()`).
162    Compressed {
163        /// The deduplicated set of unique values referenced by runs.
164        table: Vec<T>,
165
166        /// List of `(range, table_id)` pairs describing contiguous
167        /// runs of identical values in region order.
168        runs: Vec<Run>,
169    },
170}
171
172// At this time, Default is used primarily to satisfy the mailbox
173// Accumulator bound. It constructs an empty (zero-rank) mesh. Other
174// contexts may also use this as a generic "empty mesh" initializer
175// when a concrete region is not yet known.
176impl<T> Default for ValueMesh<T> {
177    fn default() -> Self {
178        Self::empty()
179    }
180}
181
182impl<T> ValueMesh<T> {
183    /// Returns an *empty* mesh: a 1-dimensional region of length 0.
184    ///
185    /// This differs from a *dimensionless* (0-D) region, which
186    /// represents a single scalar element. A zero-length 1-D region
187    /// has **no elements at all**, making it the natural `Default`
188    /// placeholder for accumulator state initialization.
189    pub fn empty() -> Self {
190        // zero-rank region; no constraints on T
191        let region = extent!(r = 0).into();
192        Self::new_unchecked(region, Vec::<T>::new())
193    }
194
195    /// Creates a new `ValueMesh` for `region` with exactly one value
196    /// per rank.
197    ///
198    /// # Invariants
199    /// This constructor validates that the number of provided values
200    /// (`ranks.len()`) matches the region's cardinality
201    /// (`region.num_ranks()`). A value mesh must be complete: every
202    /// rank in `region` has a corresponding `T`.
203    ///
204    /// # Errors
205    /// Returns [`Error::InvalidRankCardinality`] if `ranks.len() !=
206    /// region.num_ranks()`.
207    /// ```
208    #[allow(clippy::result_large_err)]
209    pub(crate) fn new(region: Region, ranks: Vec<T>) -> crate::Result<Self> {
210        let (actual, expected) = (ranks.len(), region.num_ranks());
211        if actual != expected {
212            return Err(crate::Error::InvalidRankCardinality { expected, actual });
213        }
214        Ok(Self {
215            region,
216            rep: Rep::Dense { values: ranks },
217        })
218    }
219
220    /// Constructs a `ValueMesh` without checking cardinality. Caller
221    /// must ensure `ranks.len() == region.num_ranks()`.
222    #[inline]
223    pub(crate) fn new_unchecked(region: Region, ranks: Vec<T>) -> Self {
224        debug_assert_eq!(region.num_ranks(), ranks.len());
225        Self {
226            region,
227            rep: Rep::Dense { values: ranks },
228        }
229    }
230}
231
232impl<T: Clone> ValueMesh<T> {
233    /// Builds a `ValueMesh` that assigns the single value `s` to
234    /// every rank in `region`, without materializing a dense
235    /// `Vec<T>`. The result is stored in compressed (RLE) form as a
236    /// single run `[0..N)`.
237    ///
238    /// If `region.num_ranks() == 0`, the mesh contains no runs (and
239    /// an empty table), regardless of `s`.
240    pub fn from_single(region: Region, s: T) -> Self {
241        let n = region.num_ranks();
242        if n == 0 {
243            return Self {
244                region,
245                rep: Rep::Compressed {
246                    table: Vec::new(),
247                    runs: Vec::new(),
248                },
249            };
250        }
251
252        let table = vec![s];
253        let runs = vec![Run::new(0, n, 0)];
254        Self {
255            region,
256            rep: Rep::Compressed { table, runs },
257        }
258    }
259}
260
261impl<T: Clone + Default> ValueMesh<T> {
262    /// Builds a [`ValueMesh`] covering the region, filled with
263    /// `T::default()`.
264    ///
265    /// Equivalent to [`ValueMesh::from_single(region,
266    /// T::default())`].
267    pub fn from_default(region: Region) -> Self {
268        ValueMesh::<T>::from_single(region, T::default())
269    }
270}
271
272impl<T: Eq + Hash> ValueMesh<T> {
273    /// Builds a compressed mesh from a default value and a set of
274    /// disjoint ranges that override the default.
275    ///
276    /// - `ranges` may be in any order; they must be non-empty,
277    ///   in-bounds, and non-overlapping.
278    /// - Unspecified ranks are filled with `default`.
279    /// - Result is stored in RLE form; no dense `Vec<T>` is
280    ///   materialized.
281    #[allow(clippy::result_large_err)]
282    pub fn from_ranges_with_default(
283        region: Region,
284        default: T,
285        mut ranges: Vec<(Range<usize>, T)>,
286    ) -> crate::Result<Self> {
287        let n = region.num_ranks();
288
289        if n == 0 {
290            return Ok(Self {
291                region,
292                rep: Rep::Compressed {
293                    table: Vec::new(),
294                    runs: Vec::new(),
295                },
296            });
297        }
298
299        // Validate: non-empty, in-bounds; then sort.
300        for (r, _) in &ranges {
301            if r.is_empty() {
302                return Err(crate::Error::InvalidRankCardinality {
303                    expected: n,
304                    actual: 0,
305                }); // TODO: this surfaces the error but its not a great fit
306            }
307            if r.end > n {
308                return Err(crate::Error::InvalidRankCardinality {
309                    expected: n,
310                    actual: r.end,
311                });
312            }
313        }
314        ranges.sort_by_key(|(r, _)| (r.start, r.end));
315
316        // Validate: non-overlapping.
317        for w in ranges.windows(2) {
318            let (a, _) = &w[0];
319            let (b, _) = &w[1];
320            if a.end > b.start {
321                // Overlap
322                return Err(crate::Error::InvalidRankCardinality {
323                    expected: n,
324                    actual: b.start, // TODO: this surfaces the error but is a bad fit
325                });
326            }
327        }
328
329        // Internal index: value -> table id (assigned once).
330        let mut index: HashMap<T, u32> = HashMap::with_capacity(1 + ranges.len());
331        let mut next_id: u32 = 0;
332
333        // Helper: assign or reuse an id by value, taking ownership of
334        // v.
335        let id_of = |v: T, index: &mut HashMap<T, u32>, next_id: &mut u32| -> u32 {
336            match index.entry(v) {
337                Entry::Occupied(o) => *o.get(),
338                Entry::Vacant(vac) => {
339                    let id = *next_id;
340                    *next_id += 1;
341                    vac.insert(id);
342                    id
343                }
344            }
345        };
346
347        let default_id = id_of(default, &mut index, &mut next_id);
348
349        let mut runs: Vec<Run> = Vec::with_capacity(1 + 2 * ranges.len());
350        let mut cursor = 0usize;
351
352        for (r, v) in ranges.into_iter() {
353            // Fill default gap if any.
354            if cursor < r.start {
355                runs.push(Run::new(cursor, r.start, default_id));
356            }
357            // Override block.
358            let id = id_of(v, &mut index, &mut next_id);
359            runs.push(Run::new(r.start, r.end, id));
360            cursor = r.end;
361        }
362
363        // Trailing default tail.
364        if cursor < n {
365            runs.push(Run::new(cursor, n, default_id));
366        }
367
368        // Materialize table in id order without cloning: move keys
369        // out of the map.
370        let mut table_slots: Vec<Option<T>> = Vec::new();
371        table_slots.resize_with(next_id as usize, || None);
372
373        for (t, id) in index.into_iter() {
374            table_slots[id as usize] = Some(t);
375        }
376
377        let table: Vec<T> = table_slots
378            .into_iter()
379            .map(|o| o.expect("every id must be assigned"))
380            .collect();
381
382        Ok(Self {
383            region,
384            rep: Rep::Compressed { table, runs },
385        })
386    }
387
388    /// Builds a [`ValueMesh`] from a fully materialized dense vector
389    /// of per-rank values, then compresses it into run-length–encoded
390    /// form if possible.
391    ///
392    /// This constructor is intended for callers that already have one
393    /// value per rank (e.g. computed or received data) but wish to
394    /// store it efficiently.
395    ///
396    /// # Parameters
397    /// - `region`: The logical region describing the mesh's shape and
398    ///   rank order.
399    /// - `values`: A dense vector of values, one per rank in
400    ///   `region`.
401    ///
402    /// # Returns
403    /// A [`ValueMesh`] whose internal representation is `Compressed`
404    /// if any adjacent elements are equal, or `Dense` if no
405    /// compression was possible.
406    ///
407    /// # Errors
408    /// Returns an error if the number of provided `values` does not
409    /// match the number of ranks in `region`.
410    ///
411    /// # Examples
412    /// ```ignore
413    /// let region: Region = extent!(n = 5).into();
414    /// let mesh = ValueMesh::from_dense(region, vec![1, 1, 2, 2, 3]).unwrap();
415    /// // Internally compressed to three runs: [1, 1], [2, 2], [3]
416    /// ```
417    #[allow(clippy::result_large_err)]
418    pub fn from_dense(region: Region, values: Vec<T>) -> crate::Result<Self> {
419        let mut vm = Self::new(region, values)?;
420        vm.compress_adjacent_in_place();
421        Ok(vm)
422    }
423}
424
425impl<F: Future> ValueMesh<F> {
426    /// Await all futures in the mesh, yielding a `ValueMesh` of their
427    /// outputs.
428    pub async fn join(self) -> ValueMesh<F::Output> {
429        let ValueMesh { region, rep } = self;
430
431        match rep {
432            Rep::Dense { values } => {
433                let results = futures::future::join_all(values).await;
434                ValueMesh::new_unchecked(region, results)
435            }
436            Rep::Compressed { .. } => {
437                unreachable!("join() not implemented for compressed meshes")
438            }
439        }
440    }
441}
442
443impl<T, E> ValueMesh<Result<T, E>> {
444    /// Transposes a `ValueMesh<Result<T, E>>` into a
445    /// `Result<ValueMesh<T>, E>`.
446    pub fn transpose(self) -> Result<ValueMesh<T>, E> {
447        let ValueMesh { region, rep } = self;
448
449        match rep {
450            Rep::Dense { values } => {
451                let values = values.into_iter().collect::<Result<Vec<T>, E>>()?;
452                Ok(ValueMesh::new_unchecked(region, values))
453            }
454            Rep::Compressed { table, runs } => {
455                let table: Vec<T> = table.into_iter().collect::<Result<Vec<T>, E>>()?;
456                Ok(ValueMesh {
457                    region,
458                    rep: Rep::Compressed { table, runs },
459                })
460            }
461        }
462    }
463}
464
465impl<T: 'static> view::Ranked for ValueMesh<T> {
466    type Item = T;
467
468    /// Returns the region that defines this mesh's shape and rank
469    /// order.
470    fn region(&self) -> &Region {
471        &self.region
472    }
473
474    /// Looks up the value at the given linearized rank.
475    ///
476    /// Works transparently for both dense and compressed
477    /// representations:
478    /// - In the dense case, it simply indexes into the `values`
479    ///   vector.
480    /// - In the compressed case, it performs a binary search over run
481    ///   boundaries to find which run contains the given rank, then
482    ///   returns the corresponding entry from `table`.
483    ///
484    /// Returns `None` if the rank is out of bounds.
485    fn get(&self, rank: usize) -> Option<&Self::Item> {
486        if rank >= self.region.num_ranks() {
487            return None;
488        }
489
490        match &self.rep {
491            Rep::Dense { values } => values.get(rank),
492
493            Rep::Compressed { table, runs } => {
494                let rank = rank as u64;
495
496                // Binary search over runs: find the one whose range
497                // contains `rank`.
498                let idx = runs
499                    .binary_search_by(|run| {
500                        if run.end <= rank {
501                            Ordering::Less
502                        } else if run.start > rank {
503                            Ordering::Greater
504                        } else {
505                            Ordering::Equal
506                        }
507                    })
508                    .ok()?;
509
510                // Map the run's table ID to its actual value.
511                let id = runs[idx].id as usize;
512                table.get(id)
513            }
514        }
515    }
516}
517
518impl<T: Clone + 'static> view::RankedSliceable for ValueMesh<T> {
519    fn sliced(&self, region: Region) -> Self {
520        debug_assert!(region.is_subset(self.region()), "sliced: not a subset");
521        let ranks: Vec<T> = self
522            .region()
523            .remap(&region)
524            .unwrap()
525            .map(|index| self.get(index).unwrap().clone())
526            .collect();
527        debug_assert_eq!(
528            region.num_ranks(),
529            ranks.len(),
530            "sliced: cardinality mismatch"
531        );
532        Self::new_unchecked(region, ranks)
533    }
534}
535
536impl<T> view::BuildFromRegion<T> for ValueMesh<T> {
537    type Error = crate::Error;
538
539    fn build_dense(region: Region, values: Vec<T>) -> Result<Self, Self::Error> {
540        Self::new(region, values)
541    }
542
543    fn build_dense_unchecked(region: Region, values: Vec<T>) -> Self {
544        Self::new_unchecked(region, values)
545    }
546}
547
548impl<T> view::BuildFromRegionIndexed<T> for ValueMesh<T> {
549    type Error = crate::Error;
550
551    fn build_indexed(
552        region: Region,
553        pairs: impl IntoIterator<Item = (usize, T)>,
554    ) -> Result<Self, Self::Error> {
555        let n = region.num_ranks();
556
557        // Allocate uninitialized buffer for T.
558        // Note: Vec<MaybeUninit<T>>'s Drop will only free the
559        // allocation; it never runs T's destructor. We must
560        // explicitly drop any initialized elements (DropGuard) or
561        // convert into Vec<T>.
562        let mut buf: Vec<MaybeUninit<T>> = Vec::with_capacity(n);
563        // SAFETY: set `len = n` to treat the buffer as n uninit slots
564        // of `MaybeUninit<T>`. We never read before `ptr::write`,
565        // drop only slots marked initialized (bitset), and convert to
566        // `Vec<T>` only once all 0..n are initialized (guard enforces
567        // this).
568        unsafe {
569            buf.set_len(n);
570        }
571
572        // Compact bitset for occupancy.
573        let words = n.div_ceil(64);
574        let mut bits = vec![0u64; words];
575        let mut filled = 0usize;
576
577        // Drop guard: cleans up initialized elements on early exit.
578        // Stores raw, non-borrowed pointers (`NonNull`), so we don't
579        // hold Rust references for the whole scope. This allows
580        // mutating `buf`/`bits` inside the loop while still letting
581        // the guard access them if dropped early.
582        struct DropGuard<T> {
583            buf: NonNull<MaybeUninit<T>>,
584            bits: NonNull<u64>,
585            n_elems: usize,
586            n_words: usize,
587            disarm: bool,
588        }
589
590        impl<T> DropGuard<T> {
591            /// # Safety
592            /// - `buf` points to `buf.len()` contiguous
593            ///   `MaybeUninit<T>` and outlives the guard.
594            /// - `bits` points to `bits.len()` contiguous `u64` words
595            ///   and outlives the guard.
596            /// - For every set bit `i` in `bits`, `buf[i]` has been
597            ///   initialized with a valid `T` (and on duplicates, the
598            ///   previous `T` at `i` was dropped before overwrite).
599            /// - While the guard is alive, caller may mutate
600            ///   `buf`/`bits` (the guard holds only raw pointers; it
601            ///   does not create Rust borrows).
602            /// - On drop (when not disarmed), the guard will read
603            ///   `bits`, mask tail bits in the final word, and
604            ///   `drop_in_place` each initialized `buf[i]`—never
605            ///   accessing beyond `buf.len()` / `bits.len()`.
606            /// - If a slice is empty, the stored pointer may be
607            ///   `dangling()`; it is never dereferenced when the
608            ///   corresponding length is zero.
609            unsafe fn new(buf: &mut [MaybeUninit<T>], bits: &mut [u64]) -> Self {
610                let n_elems = buf.len();
611                let n_words = bits.len();
612                // Expected: n_words == (n_elems + 63) / 64
613                // but we don't *require* it; tail is masked in Drop.
614                Self {
615                    buf: NonNull::new(buf.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
616                    bits: NonNull::new(bits.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
617                    n_elems,
618                    n_words,
619                    disarm: false,
620                }
621            }
622
623            #[inline]
624            fn disarm(&mut self) {
625                self.disarm = true;
626            }
627        }
628
629        impl<T> Drop for DropGuard<T> {
630            fn drop(&mut self) {
631                if self.disarm {
632                    return;
633                }
634
635                // SAFETY:
636                // - `self.buf` points to `n_elems` contiguous
637                //   `MaybeUninit<T>` slots (or may be dangling if
638                //   `n_elems == 0`); `self.bits` points to `n_words`
639                //   contiguous `u64` words (or may be dangling if
640                //   `n_words == 0`).
641                // - Loop bounds ensure `w < n_words` when reading
642                //   `*bits_base.add(w)`.
643                // - For the final word we mask unused tail bits so
644                //   any computed index `i = w * 64 + tz` always
645                //   satisfies `i < n_elems` before we dereference
646                //   `buf_base.add(i)`.
647                // - Only slots whose bits are set are dropped, so no
648                //   double-drops.
649                // - No aliasing with active Rust borrows: the guard
650                //   holds raw pointers and runs in `Drop` after the
651                //   fill loop.
652                unsafe {
653                    let buf_base = self.buf.as_ptr();
654                    let bits_base = self.bits.as_ptr();
655
656                    for w in 0..self.n_words {
657                        // Load word.
658                        let mut word = *bits_base.add(w);
659
660                        // Mask off bits beyond `n_elems` in the final
661                        // word (if any).
662                        if w == self.n_words.saturating_sub(1) {
663                            let used_bits = self.n_elems.saturating_sub(w * 64);
664                            if used_bits < 64 {
665                                let mask = if used_bits == 0 {
666                                    0
667                                } else {
668                                    (1u64 << used_bits) - 1
669                                };
670                                word &= mask;
671                            }
672                        }
673
674                        // Fast scan set bits.
675                        while word != 0 {
676                            let tz = word.trailing_zeros() as usize;
677                            let i = w * 64 + tz;
678                            debug_assert!(i < self.n_elems);
679
680                            let slot = buf_base.add(i);
681                            // Drop the initialized element.
682                            ptr::drop_in_place((*slot).as_mut_ptr());
683
684                            // clear the bit we just handled
685                            word &= word - 1;
686                        }
687                    }
688                }
689            }
690        }
691
692        // SAFETY:
693        // - `buf` and `bits` are freshly allocated Vecs with
694        //   capacity/len set to cover exactly `n_elems` and `n_words`,
695        //   so their `.as_mut_ptr()` is valid for that many elements.
696        // - Both slices live at least as long as the guard, and are
697        //   not moved until after the guard is disarmed.
698        // - No aliasing occurs: the guard holds only raw pointers and
699        //   the fill loop mutates through those same allocations.
700        let mut guard = unsafe { DropGuard::new(&mut buf, &mut bits) };
701
702        for (rank, value) in pairs {
703            // Single bounds check up front.
704            if rank >= guard.n_elems {
705                return Err(crate::Error::InvalidRankCardinality {
706                    expected: guard.n_elems,
707                    actual: rank + 1,
708                });
709            }
710
711            // Compute word index and bit mask once.
712            let w = rank / 64;
713            let b = rank % 64;
714            let mask = 1u64 << b;
715
716            // SAFETY:
717            // - `rank < guard.n_elems` was checked above, so
718            //   `buf_slot = buf.add(rank)` is within the
719            //   `Vec<MaybeUninit<T>>` allocation.
720            // - `w = rank / 64` and `bits.len() == (n + 63) / 64`
721            //   ensure `bits_ptr = bits.add(w)` is in-bounds.
722            // - If `(word & mask) != 0`, then this slot was
723            //   previously initialized;
724            //   `drop_in_place((*buf_slot).as_mut_ptr())` is valid and
725            //   leaves the slot uninitialized.
726            // - If the bit was clear, we set it and count `filled +=
727            //   1`.
728            // - `(*buf_slot).write(value)` is valid in both cases:
729            //   either writing into an uninitialized slot or
730            //   immediately after dropping the prior `T`.
731            // - No aliasing with Rust references: the guard holds raw
732            //   pointers and we have exclusive ownership of
733            //   `buf`/`bits` within this function.
734            unsafe {
735                // Pointers from the guard (no long-lived & borrows).
736                let bits_ptr = guard.bits.as_ptr().add(w);
737                let buf_slot = guard.buf.as_ptr().add(rank);
738
739                // Read the current word.
740                let word = *bits_ptr;
741
742                if (word & mask) != 0 {
743                    // Duplicate: drop old value before overwriting.
744                    core::ptr::drop_in_place((*buf_slot).as_mut_ptr());
745                    // (Bit already set; no need to set again; don't
746                    // bump `filled`.)
747                } else {
748                    // First time we see this rank.
749                    *bits_ptr = word | mask;
750                    filled += 1;
751                }
752
753                // Write new value into the slot.
754                (*buf_slot).write(value);
755            }
756        }
757
758        if filled != n {
759            // Missing ranks: actual = number of distinct ranks seen.
760            return Err(crate::Error::InvalidRankCardinality {
761                expected: n,
762                actual: filled,
763            });
764        }
765
766        // Success: prevent cleanup.
767        guard.disarm();
768
769        // SAFETY: all n slots are initialized
770        let ranks = unsafe {
771            let ptr = buf.as_mut_ptr() as *mut T;
772            let len = buf.len();
773            let cap = buf.capacity();
774            // Prevent `buf` (Vec<MaybeUninit<T>>) from freeing the
775            // allocation. Ownership of the buffer is about to be
776            // transferred to `Vec<T>` via `from_raw_parts`.
777            // Forgetting avoids a double free.
778            mem::forget(buf);
779            Vec::from_raw_parts(ptr, len, cap)
780        };
781
782        Ok(Self::new_unchecked(region, ranks))
783    }
784}
785
786impl<T: PartialEq> ValueMesh<T> {
787    /// Compresses the mesh in place using run-length encoding (RLE).
788    ///
789    /// This method scans the mesh's dense values, coalescing adjacent
790    /// runs of identical elements into a compact [`Rep::Compressed`]
791    /// representation. It replaces the internal storage (`rep`) with
792    /// the compressed form.
793    ///
794    /// # Behavior
795    /// - If the mesh is already compressed, this is a **no-op**.
796    /// - If the mesh is dense, it consumes the current `Vec<T>` and
797    ///   rebuilds the representation as a run table plus value table.
798    /// - Only *adjacent* equal values are merged; non-contiguous
799    ///   duplicates remain distinct.
800    ///
801    /// # Requirements
802    /// - `T` must implement [`PartialEq`] (to detect equal values).
803    ///
804    /// This operation is lossless: expanding the compressed mesh back
805    /// into a dense vector yields the same sequence of values.
806    pub fn compress_adjacent_in_place(&mut self) {
807        self.compress_adjacent_in_place_by(|a, b| a == b)
808    }
809}
810
811impl<T: Clone + PartialEq> ValueMesh<T> {
812    /// Materializes this mesh into a vector of `(Range<usize>, T)`
813    /// runs.
814    ///
815    /// For a dense representation, this walks the value vector and
816    /// groups adjacent equal values into contiguous runs. The result
817    /// is equivalent to what would be stored in a compressed
818    /// representation, but the mesh itself is not mutated or
819    /// re-encoded. This is purely a read-only view.
820    ///
821    /// For a compressed representation, the stored runs are simply
822    /// cloned.
823    ///
824    /// This method is intended for inspection, testing, and
825    /// diff/merge operations that need a uniform view of value runs
826    /// without changing the underlying representation.
827    fn materialized_runs(&self) -> Vec<(Range<usize>, T)> {
828        match &self.rep {
829            Rep::Dense { values } => {
830                // Coalesce adjacent equals into runs.
831                let mut out = Vec::new();
832                if values.is_empty() {
833                    return out;
834                }
835                let mut start = 0usize;
836                for i in 1..values.len() {
837                    if values[i] != values[i - 1] {
838                        out.push((start..i, values[i - 1].clone()));
839                        start = i;
840                    }
841                }
842                out.push((start..values.len(), values.last().unwrap().clone()));
843                out
844            }
845            Rep::Compressed { table, runs } => runs
846                .iter()
847                .map(|r| {
848                    let id = r.id as usize;
849                    ((r.start as usize..r.end as usize), table[id].clone())
850                })
851                .collect(),
852        }
853    }
854}
855
856impl<T: Clone + Eq> ValueMesh<T> {
857    /// Merge a sparse overlay into this mesh.
858    ///
859    /// Overlay segments are applied with **last-writer-wins**
860    /// precedence on overlap (identical to `RankedValues::merge_from`
861    /// behavior). The result is stored compressed.
862    pub fn merge_from_overlay(&mut self, overlay: ValueOverlay<T>) -> Result<(), BuildError> {
863        let n = self.region.num_ranks();
864
865        // Bounds validation (structure already validated by
866        // ValueOverlay).
867        for (r, _) in overlay.runs() {
868            if r.end > n {
869                return Err(BuildError::OutOfBounds {
870                    range: r.clone(),
871                    region_len: n,
872                });
873            }
874        }
875
876        // Left: current mesh as normalized value-bearing runs.
877        let left = self.materialized_runs();
878        // Right: overlay runs (already sorted, non-overlapping,
879        // coalesced).
880        let right: Vec<(std::ops::Range<usize>, T)> = overlay.runs().cloned().collect();
881
882        // Merge with overlay precedence, reusing the same splitting
883        // strategy as RankedValues::merge_from.
884        let merged = rle::merge_value_runs(left, right);
885
886        // Re-encode to compressed representation:
887        let (table, raw_runs) = rle::rle_from_value_runs(merged);
888        let runs = raw_runs
889            .into_iter()
890            .map(|(r, id)| Run::new(r.start, r.end, id))
891            .collect();
892        self.rep = Rep::Compressed { table, runs };
893
894        Ok(())
895    }
896}
897
898impl<T> ValueMesh<T> {
899    /// Compresses the mesh in place using a custom equivalence
900    /// predicate.
901    ///
902    /// This is a generalized form of [`compress_adjacent_in_place`]
903    /// that merges adjacent values according to an arbitrary
904    /// predicate `same(a, b)`, rather than relying on `PartialEq`.
905    ///
906    /// # Behavior
907    /// - If the mesh is already compressed, this is a **no-op**.
908    /// - Otherwise, consumes the dense `Vec<T>` and replaces it with
909    ///   a run-length encoded (`Rep::Compressed`) representation,
910    ///   where consecutive elements satisfying `same(a, b)` are
911    ///   coalesced into a single run.
912    ///
913    /// # Requirements
914    /// - The predicate must be reflexive and symmetric for
915    ///   correctness.
916    ///
917    /// This operation is lossless: expanding the compressed mesh
918    /// reproduces the original sequence exactly under the same
919    /// equivalence.
920    pub fn compress_adjacent_in_place_by<F>(&mut self, same: F)
921    where
922        F: FnMut(&T, &T) -> bool,
923    {
924        let values = match &mut self.rep {
925            Rep::Dense { values } => std::mem::take(values),
926            Rep::Compressed { .. } => return,
927        };
928        let (table, raw_runs) = rle::rle_from_dense(values, same);
929        let runs = raw_runs
930            .into_iter()
931            .map(|(r, id)| Run::new(r.start, r.end, id))
932            .collect();
933        self.rep = Rep::Compressed { table, runs };
934    }
935}
936
937/// Accumulates sparse overlay updates into an authoritative mesh.
938///
939/// Lifecycle:
940/// - Mailbox initializes `State` via `Default` (empty mesh).
941/// - On the first update, the accumulator clones `self` (the template
942///   passed to `open_accum_port_opts`) into `state`. Callers pass a
943///   template such as `StatusMesh::from_single(region, NotExist)`.
944/// - Each overlay update is merged with right-wins semantics via
945///   `merge_from_overlay`, and the accumulator emits a *full*
946///   ValueMesh.
947///
948/// The accumulator's state is a [`ValueMesh<T>`] and its updates are
949/// [`ValueOverlay<T>`] instances. On each update, the overlay’s
950/// normalized runs are merged into the mesh using
951/// [`ValueMesh::merge_from_overlay`] with right-wins semantics.
952///
953/// This enables incremental reduction of sparse updates across
954/// distributed reducers without materializing dense data.
955impl<T> Accumulator for ValueMesh<T>
956where
957    T: Eq + Clone + Named,
958{
959    type State = Self;
960    type Update = ValueOverlay<T>;
961
962    fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
963        // Mailbox starts with A::State::default() (empty). On the
964        // first update, re-initialize to our template (self), which
965        // the caller constructed as "full NotExist over the target
966        // region".
967        if state.region().num_ranks() == 0 {
968            *state = self.clone();
969        }
970
971        // Apply sparse delta into the authoritative mesh
972        // (right-wins).
973        state.merge_from_overlay(update)?;
974        Ok(())
975    }
976
977    fn reducer_spec(&self) -> Option<ReducerSpec> {
978        Some(ReducerSpec {
979            typehash: <ValueOverlayReducer<T> as Named>::typehash(),
980            builder_params: None,
981        })
982    }
983}
984
985/// Marker reducer type for [`ValueOverlay<T>`].
986///
987/// This reducer carries no state; it exists only to bind a concrete
988/// type parameter `T` to the [`CommReducer`] implementation below.
989/// Reduction is purely functional and uses right-wins merge semantics
990/// defined in [`merge_value_runs`].
991#[derive(Named)]
992struct ValueOverlayReducer<T>(std::marker::PhantomData<T>);
993
994/// Reducer for sparse overlay updates.
995///
996/// Combines two [`ValueOverlay<T>`] updates using right-wins
997/// semantics: overlapping runs from `right` overwrite those in
998/// `left`. The merged runs are then normalized and validated via
999/// [`ValueOverlay::try_from_runs`].
1000///
1001/// Used by the corresponding [`Accumulator`] to perform distributed
1002/// reduction of incremental mesh updates.
1003impl<T> CommReducer for ValueOverlayReducer<T>
1004where
1005    T: Eq + Clone + Named,
1006{
1007    type Update = ValueOverlay<T>;
1008
1009    // Last-writer-wins merge of two sparse overlays.
1010    fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
1011        // 1) Merge runs with right precedence.
1012        let merged = crate::value_mesh::rle::merge_value_runs(
1013            left.runs().cloned().collect(),
1014            right.runs().cloned().collect(),
1015        );
1016        // 2) Re-normalize to an overlay (validates, coalesces).
1017        Ok(ValueOverlay::try_from_runs(merged)?)
1018    }
1019}
1020
1021// register for concrete types:
1022
1023hyperactor::internal_macro_support::inventory::submit! {
1024    ReducerFactory {
1025        typehash_f: <ValueOverlayReducer<crate::resource::Status> as Named>::typehash,
1026        builder_f: |_| Ok(Box::new(ValueOverlayReducer::<crate::resource::Status>(PhantomData))),
1027    }
1028}
1029
1030#[cfg(test)]
1031mod tests {
1032    use std::convert::Infallible;
1033    use std::future::Future;
1034    use std::pin::Pin;
1035    use std::task::Context;
1036    use std::task::Poll;
1037    use std::task::RawWaker;
1038    use std::task::RawWakerVTable;
1039    use std::task::Waker;
1040
1041    use futures::executor::block_on;
1042    use futures::future;
1043    use ndslice::extent;
1044    use ndslice::strategy::gen_region;
1045    use ndslice::view::CollectExactMeshExt;
1046    use ndslice::view::CollectIndexedMeshExt;
1047    use ndslice::view::CollectMeshExt;
1048    use ndslice::view::MapIntoExt;
1049    use ndslice::view::Ranked;
1050    use ndslice::view::RankedSliceable;
1051    use ndslice::view::ViewExt;
1052    use proptest::prelude::*;
1053    use proptest::strategy::ValueTree;
1054    use serde_json;
1055
1056    use super::*;
1057
1058    #[test]
1059    fn value_mesh_new_ok() {
1060        let region: Region = extent!(replica = 2, gpu = 3).into();
1061        let mesh = ValueMesh::new(region.clone(), (0..6).collect()).expect("new should succeed");
1062        assert_eq!(mesh.region().num_ranks(), 6);
1063        assert_eq!(mesh.values().count(), 6);
1064        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1065    }
1066
1067    #[test]
1068    fn value_mesh_new_len_mismatch_is_error() {
1069        let region: Region = extent!(replica = 2, gpu = 3).into();
1070        let err = ValueMesh::new(region, vec![0_i32; 5]).unwrap_err();
1071        match err {
1072            crate::Error::InvalidRankCardinality { expected, actual } => {
1073                assert_eq!(expected, 6);
1074                assert_eq!(actual, 5);
1075            }
1076            other => panic!("unexpected error: {other:?}"),
1077        }
1078    }
1079
1080    #[test]
1081    fn value_mesh_transpose_ok_and_err() {
1082        let region: Region = extent!(x = 2).into();
1083
1084        // ok case
1085        let ok_mesh = ValueMesh::new(region.clone(), vec![Ok::<_, Infallible>(1), Ok(2)]).unwrap();
1086        let ok = ok_mesh.transpose().unwrap();
1087        assert_eq!(ok.values().collect::<Vec<_>>(), vec![1, 2]);
1088
1089        // err case: propagate user E
1090        #[derive(Debug, PartialEq)]
1091        enum E {
1092            Boom,
1093        }
1094        let err_mesh = ValueMesh::new(region, vec![Ok(1), Err(E::Boom)]).unwrap();
1095        let err = err_mesh.transpose().unwrap_err();
1096        assert_eq!(err, E::Boom);
1097    }
1098
1099    #[test]
1100    fn value_mesh_join_preserves_region_and_values() {
1101        let region: Region = extent!(x = 2, y = 2).into();
1102        let futs = vec![
1103            future::ready(10),
1104            future::ready(11),
1105            future::ready(12),
1106            future::ready(13),
1107        ];
1108        let mesh = ValueMesh::new(region.clone(), futs).unwrap();
1109
1110        let joined = block_on(mesh.join());
1111        assert_eq!(joined.region().num_ranks(), 4);
1112        assert_eq!(joined.values().collect::<Vec<_>>(), vec![10, 11, 12, 13]);
1113    }
1114
1115    #[test]
1116    fn collect_mesh_ok() {
1117        let region: Region = extent!(x = 2, y = 3).into();
1118        let mesh = (0..6)
1119            .collect_mesh::<ValueMesh<_>>(region.clone())
1120            .expect("collect_mesh should succeed");
1121
1122        assert_eq!(mesh.region().num_ranks(), 6);
1123        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1124    }
1125
1126    #[test]
1127    fn collect_mesh_len_too_short_is_error() {
1128        let region: Region = extent!(x = 2, y = 3).into();
1129        let err = (0..5).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
1130
1131        match err {
1132            crate::Error::InvalidRankCardinality { expected, actual } => {
1133                assert_eq!(expected, 6);
1134                assert_eq!(actual, 5);
1135            }
1136            other => panic!("unexpected error: {other:?}"),
1137        }
1138    }
1139
1140    #[test]
1141    fn collect_mesh_len_too_long_is_error() {
1142        let region: Region = extent!(x = 2, y = 3).into();
1143        let err = (0..7).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
1144        match err {
1145            crate::Error::InvalidRankCardinality { expected, actual } => {
1146                assert_eq!(expected, 6);
1147                assert_eq!(actual, 7);
1148            }
1149            other => panic!("unexpected error: {other:?}"),
1150        }
1151    }
1152
1153    #[test]
1154    fn collect_mesh_from_map_pipeline() {
1155        let region: Region = extent!(x = 2, y = 2).into();
1156        let mesh = (0..4)
1157            .map(|i| i * 10)
1158            .collect_mesh::<ValueMesh<_>>(region.clone())
1159            .unwrap();
1160
1161        assert_eq!(mesh.region().num_ranks(), 4);
1162        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
1163    }
1164
1165    #[test]
1166    fn collect_exact_mesh_ok() {
1167        let region: Region = extent!(x = 2, y = 3).into();
1168        let mesh = (0..6)
1169            .collect_exact_mesh::<ValueMesh<_>>(region.clone())
1170            .expect("collect_exact_mesh should succeed");
1171
1172        assert_eq!(mesh.region().num_ranks(), 6);
1173        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1174    }
1175
1176    #[test]
1177    fn collect_exact_mesh_len_too_short_is_error() {
1178        let region: Region = extent!(x = 2, y = 3).into();
1179        let err = (0..5)
1180            .collect_exact_mesh::<ValueMesh<_>>(region)
1181            .unwrap_err();
1182
1183        match err {
1184            crate::Error::InvalidRankCardinality { expected, actual } => {
1185                assert_eq!(expected, 6);
1186                assert_eq!(actual, 5);
1187            }
1188            other => panic!("unexpected error: {other:?}"),
1189        }
1190    }
1191
1192    #[test]
1193    fn collect_exact_mesh_len_too_long_is_error() {
1194        let region: Region = extent!(x = 2, y = 3).into();
1195        let err = (0..7)
1196            .collect_exact_mesh::<ValueMesh<_>>(region)
1197            .unwrap_err();
1198
1199        match err {
1200            crate::Error::InvalidRankCardinality { expected, actual } => {
1201                assert_eq!(expected, 6);
1202                assert_eq!(actual, 7);
1203            }
1204            other => panic!("unexpected error: {other:?}"),
1205        }
1206    }
1207
1208    #[test]
1209    fn collect_exact_mesh_from_map_pipeline() {
1210        let region: Region = extent!(x = 2, y = 2).into();
1211        let mesh = (0..4)
1212            .map(|i| i * 10)
1213            .collect_exact_mesh::<ValueMesh<_>>(region.clone())
1214            .unwrap();
1215
1216        assert_eq!(mesh.region().num_ranks(), 4);
1217        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
1218    }
1219
1220    #[test]
1221    fn collect_indexed_ok_shuffled() {
1222        let region: Region = extent!(x = 2, y = 3).into();
1223        // (rank, value) in shuffled order; values = rank * 10
1224        let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)];
1225        let mesh = pairs
1226            .into_iter()
1227            .collect_indexed::<ValueMesh<_>>(region.clone())
1228            .unwrap();
1229
1230        assert_eq!(mesh.region().num_ranks(), 6);
1231        assert_eq!(
1232            mesh.values().collect::<Vec<_>>(),
1233            vec![0, 10, 20, 30, 40, 50]
1234        );
1235    }
1236
1237    #[test]
1238    fn collect_indexed_missing_rank_is_error() {
1239        let region: Region = extent!(x = 2, y = 2).into(); // 4
1240        // Missing rank 3
1241        let pairs = vec![(0, 100), (1, 101), (2, 102)];
1242        let err = pairs
1243            .into_iter()
1244            .collect_indexed::<ValueMesh<_>>(region)
1245            .unwrap_err();
1246
1247        match err {
1248            crate::Error::InvalidRankCardinality { expected, actual } => {
1249                assert_eq!(expected, 4);
1250                assert_eq!(actual, 3); // Distinct ranks seen.
1251            }
1252            other => panic!("unexpected error: {other:?}"),
1253        }
1254    }
1255
1256    #[test]
1257    fn collect_indexed_out_of_bounds_is_error() {
1258        let region: Region = extent!(x = 2, y = 2).into(); // 4 (valid ranks 0..=3)
1259        let pairs = vec![(0, 1), (4, 9)]; // 4 is out-of-bounds
1260        let err = pairs
1261            .into_iter()
1262            .collect_indexed::<ValueMesh<_>>(region)
1263            .unwrap_err();
1264
1265        match err {
1266            crate::Error::InvalidRankCardinality { expected, actual } => {
1267                assert_eq!(expected, 4);
1268                assert_eq!(actual, 5); // offending index + 1
1269            }
1270            other => panic!("unexpected error: {other:?}"),
1271        }
1272    }
1273
1274    #[test]
1275    fn collect_indexed_duplicate_last_write_wins() {
1276        let region: Region = extent!(x = 1, y = 3).into(); // 3
1277        // rank 1 appears twice; last value should stick
1278        let pairs = vec![(0, 7), (1, 8), (1, 88), (2, 9)];
1279        let mesh = pairs
1280            .into_iter()
1281            .collect_indexed::<ValueMesh<_>>(region.clone())
1282            .unwrap();
1283
1284        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![7, 88, 9]);
1285    }
1286
1287    // Indexed collector naïve implementation (for reference).
1288    #[allow(clippy::result_large_err)]
1289    fn build_value_mesh_indexed<T>(
1290        region: Region,
1291        pairs: impl IntoIterator<Item = (usize, T)>,
1292    ) -> crate::Result<ValueMesh<T>> {
1293        let n = region.num_ranks();
1294
1295        // Buffer for exactly n slots; fill by rank.
1296        let mut buf: Vec<Option<T>> = std::iter::repeat_with(|| None).take(n).collect();
1297        let mut filled = 0usize;
1298
1299        for (rank, value) in pairs {
1300            if rank >= n {
1301                // Out-of-bounds: report `expected` = n, `actual` =
1302                // offending index + 1; i.e. number of ranks implied
1303                // so far.
1304                return Err(crate::Error::InvalidRankCardinality {
1305                    expected: n,
1306                    actual: rank + 1,
1307                });
1308            }
1309            if buf[rank].is_none() {
1310                filled += 1;
1311            }
1312            buf[rank] = Some(value); // Last write wins.
1313        }
1314
1315        if filled != n {
1316            // Missing ranks: actual = number of distinct ranks seen.
1317            return Err(crate::Error::InvalidRankCardinality {
1318                expected: n,
1319                actual: filled,
1320            });
1321        }
1322
1323        // All present and in-bounds: unwrap and build unchecked.
1324        let ranks: Vec<T> = buf.into_iter().map(Option::unwrap).collect();
1325        Ok(ValueMesh::new_unchecked(region, ranks))
1326    }
1327
1328    /// This uses the bit-mixing portion of Sebastiano Vigna's
1329    /// [SplitMix64 algorithm](https://prng.di.unimi.it/splitmix64.c)
1330    /// to generate a high-quality 64-bit hash from a usize index.
1331    /// Unlike the full SplitMix64 generator, this is stateless - we
1332    /// accept an arbitrary x as input and apply the mix function to
1333    /// turn `x` deterministically into a "randomized" u64. input
1334    /// always produces the same output.
1335    fn hash_key(x: usize) -> u64 {
1336        let mut z = x as u64 ^ 0x9E3779B97F4A7C15;
1337        z = (z ^ (z >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
1338        z = (z ^ (z >> 27)).wrapping_mul(0x94D049BB133111EB);
1339        z ^ (z >> 31)
1340    }
1341
1342    /// Shuffle a slice deterministically, using a hash of indices as
1343    /// the key.
1344    ///
1345    /// Each position `i` is assigned a pseudo-random 64-bit key (from
1346    /// `key(i)`), the slice is sorted by those keys, and the
1347    /// resulting permutation is applied in place.
1348    ///
1349    /// The permutation is fully determined by the sequence of indices
1350    /// `0..n` and the chosen `key` function. Running it twice on the
1351    /// same input yields the same "random-looking" arrangement.
1352    ///
1353    /// This is going to be used (below) for property tests: it gives
1354    /// the effect of a shuffle without introducing global RNG state,
1355    /// and ensures that duplicate elements are still ordered
1356    /// consistently (so we can test "last write wins" semantics in
1357    /// collectors).
1358    fn pseudo_shuffle<'a, T: 'a>(v: &'a mut [T], key: impl Fn(usize) -> u64 + Copy) {
1359        // Build perm.
1360        let mut with_keys: Vec<(u64, usize)> = (0..v.len()).map(|i| (key(i), i)).collect();
1361        with_keys.sort_by_key(|&(k, _)| k);
1362        let perm: Vec<usize> = with_keys.into_iter().map(|(_, i)| i).collect();
1363
1364        // In-place permutation using a cycle based approach (e.g.
1365        // https://www.geeksforgeeks.org/dsa/permute-the-elements-of-an-array-following-given-order/).
1366        let mut seen = vec![false; v.len()];
1367        for i in 0..v.len() {
1368            if seen[i] {
1369                continue;
1370            }
1371            let mut a = i;
1372            while !seen[a] {
1373                seen[a] = true;
1374                let b = perm[a];
1375                // Short circuit on the cycle's start index.
1376                if b == i {
1377                    break;
1378                }
1379                v.swap(a, b);
1380                a = b;
1381            }
1382        }
1383    }
1384
1385    // Property: Optimized and reference collectors yield the same
1386    // `ValueMesh` on complete inputs, even with duplicates.
1387    //
1388    // - Begin with a complete set of `(rank, value)` pairs covering
1389    //   all ranks of the region.
1390    // - Add extra pairs at arbitrary ranks (up to `extra_len`), which
1391    //   necessarily duplicate existing entries when `extra_len > 0`.
1392    // - Shuffle the combined pairs deterministically.
1393    // - Collect using both the reference (`try_collect_indexed`) and
1394    //   optimized (`try_collect_indexed_opt`) implementations.
1395    //
1396    // Both collectors must succeed and produce identical results.
1397    // This demonstrates that the optimized version preserves
1398    // last-write-wins semantics and agrees exactly with the reference
1399    // behavior.
1400    proptest! {
1401        #[test]
1402        fn try_collect_opt_equivalence(region in gen_region(1..=4, 6), extra_len in 0usize..=12) {
1403            let n = region.num_ranks();
1404
1405            // Start with one pair per rank (coverage guaranteed).
1406            let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
1407
1408            // Add some extra duplicates of random in-bounds ranks.
1409            // Their values differ so last-write-wins is observable.
1410            let extras = proptest::collection::vec(0..n, extra_len)
1411                .new_tree(&mut proptest::test_runner::TestRunner::default())
1412                .unwrap()
1413                .current();
1414            for (k, r) in extras.into_iter().enumerate() {
1415                pairs.push((r, (n as i64) + (k as i64)));
1416            }
1417
1418            // Deterministic "shuffle" to fix iteration order across
1419            // both collectors.
1420            pseudo_shuffle(&mut pairs, hash_key);
1421
1422            // Reference vs optimized.
1423            let mesh_ref = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap();
1424            let mesh_opt = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region.clone()).unwrap();
1425
1426            prop_assert_eq!(mesh_ref.region(), mesh_opt.region());
1427            prop_assert_eq!(mesh_ref.values().collect::<Vec<_>>(), mesh_opt.values().collect::<Vec<_>>());
1428        }
1429    }
1430
1431    // Property: Optimized and reference collectors report identical
1432    // errors when ranks are missing.
1433    //
1434    // - Begin with a complete set of `(rank, value)` pairs.
1435    // - Remove one rank so coverage is incomplete.
1436    // - Shuffle deterministically.
1437    // - Collect with both implementations.
1438    //
1439    // Both must fail with `InvalidRankCardinality` describing the
1440    // same expected vs. actual counts.
1441    proptest! {
1442        #[test]
1443        fn try_collect_opt_missing_rank_errors_match(region in gen_region(1..=4, 6)) {
1444            let n = region.num_ranks();
1445            // Base complete.
1446            let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
1447            // Drop one distinct rank.
1448            if n > 0 {
1449                let drop_idx = 0usize; // Deterministic, fine for the property.
1450                pairs.remove(drop_idx);
1451            }
1452            // Shuffle deterministically.
1453            pseudo_shuffle(&mut pairs, hash_key);
1454
1455            let ref_err  = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
1456            let opt_err  = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
1457            assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
1458        }
1459    }
1460
1461    // Property: Optimized and reference collectors report identical
1462    // errors when given out-of-bounds ranks.
1463    //
1464    // - Construct a set of `(rank, value)` pairs.
1465    // - Include at least one pair whose rank is ≥
1466    //   `region.num_ranks()`.
1467    // - Shuffle deterministically.
1468    // - Collect with both implementations.
1469    //
1470    // Both must fail with `InvalidRankCardinality`, and the reported
1471    // error values must match exactly.
1472    proptest! {
1473        #[test]
1474        fn try_collect_opt_out_of_bound_errors_match(region in gen_region(1..=4, 6)) {
1475            let n = region.num_ranks();
1476            // One valid, then one out-of-bound.
1477            let mut pairs = vec![(0usize, 0i64), (n, 123i64)];
1478            pseudo_shuffle(&mut pairs, hash_key);
1479
1480            let ref_err = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
1481            let opt_err = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
1482            assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
1483        }
1484    }
1485
1486    #[test]
1487    fn map_into_preserves_region_and_order() {
1488        let region: Region = extent!(rows = 2, cols = 3).into();
1489        let vm = ValueMesh::new_unchecked(region.clone(), vec![0, 1, 2, 3, 4, 5]);
1490
1491        let doubled: ValueMesh<_> = vm.map_into(|x| x * 2);
1492        assert_eq!(doubled.region, region);
1493        assert_eq!(
1494            doubled.values().collect::<Vec<_>>(),
1495            vec![0, 2, 4, 6, 8, 10]
1496        );
1497    }
1498
1499    #[test]
1500    fn map_into_ref_borrows_and_preserves() {
1501        let region: Region = extent!(n = 4).into();
1502        let vm = ValueMesh::new_unchecked(
1503            region.clone(),
1504            vec!["a".to_string(), "b".into(), "c".into(), "d".into()],
1505        );
1506
1507        let lens: ValueMesh<_> = vm.map_into(|s| s.len());
1508        assert_eq!(lens.region, region);
1509        assert_eq!(lens.values().collect::<Vec<_>>(), vec![1, 1, 1, 1]);
1510    }
1511
1512    #[test]
1513    fn try_map_into_short_circuits_on_error() {
1514        let region = extent!(n = 4).into();
1515        let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
1516
1517        let res: Result<ValueMesh<i32>, &'static str> =
1518            vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
1519
1520        assert!(res.is_err());
1521        assert_eq!(res.unwrap_err(), "boom");
1522    }
1523
1524    #[test]
1525    fn try_map_into_ref_short_circuits_on_error() {
1526        let region = extent!(n = 4).into();
1527        let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
1528
1529        let res: Result<ValueMesh<i32>, &'static str> =
1530            vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
1531
1532        assert!(res.is_err());
1533        assert_eq!(res.unwrap_err(), "boom");
1534    }
1535
1536    // -- Helper to poll `core::future::Ready` without a runtime
1537    fn noop_waker() -> Waker {
1538        fn clone(_: *const ()) -> RawWaker {
1539            RawWaker::new(std::ptr::null(), &VTABLE)
1540        }
1541        fn wake(_: *const ()) {}
1542        fn wake_by_ref(_: *const ()) {}
1543        fn drop(_: *const ()) {}
1544        static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
1545        // SAFETY: The raw waker never dereferences its data pointer
1546        // (`null`), and all vtable fns are no-ops. It's only used to
1547        // satisfy `Context` for polling already-ready futures in
1548        // tests.
1549        unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
1550    }
1551
1552    fn poll_now<F: Future>(mut fut: F) -> F::Output {
1553        let waker = noop_waker();
1554        let mut cx = Context::from_waker(&waker);
1555        // SAFETY: `fut` is a local stack variable that we never move
1556        // after pinning, and we only use it to poll immediately
1557        // within this scope. This satisfies the invariants of
1558        // `Pin::new_unchecked`.
1559        let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
1560        match fut.as_mut().poll(&mut cx) {
1561            Poll::Ready(v) => v,
1562            Poll::Pending => unreachable!("Ready futures must complete immediately"),
1563        }
1564    }
1565    // --
1566
1567    #[test]
1568    fn map_into_ready_futures() {
1569        let region: Region = extent!(r = 2, c = 2).into();
1570        let vm = ValueMesh::new_unchecked(region.clone(), vec![10, 20, 30, 40]);
1571
1572        // Map to `core::future::Ready` futures.
1573        let pending: ValueMesh<core::future::Ready<_>> =
1574            vm.map_into(|x| core::future::ready(x + 1));
1575        assert_eq!(pending.region, region);
1576
1577        // Drive the ready futures without a runtime and collect results.
1578        let results: Vec<_> = pending.values().map(|f| poll_now(f.clone())).collect();
1579        assert_eq!(results, vec![11, 21, 31, 41]);
1580    }
1581
1582    #[test]
1583    fn map_into_single_element_mesh() {
1584        let region: Region = extent!(n = 1).into();
1585        let vm = ValueMesh::new_unchecked(region.clone(), vec![7]);
1586
1587        let out: ValueMesh<_> = vm.map_into(|x| x * x);
1588        assert_eq!(out.region, region);
1589        assert_eq!(out.values().collect::<Vec<_>>(), vec![49]);
1590    }
1591
1592    #[test]
1593    fn map_into_ref_with_non_clone_field() {
1594        // A type that intentionally does NOT implement Clone.
1595        #[derive(Debug, PartialEq, Eq)]
1596        struct NotClone(i32);
1597
1598        let region: Region = extent!(x = 3).into();
1599        let values = vec![(10, NotClone(1)), (20, NotClone(2)), (30, NotClone(3))];
1600        let mesh: ValueMesh<(i32, NotClone)> =
1601            values.into_iter().collect_mesh(region.clone()).unwrap();
1602
1603        let projected: ValueMesh<i32> = mesh.map_into(|t| t.0);
1604        assert_eq!(projected.values().collect::<Vec<_>>(), vec![10, 20, 30]);
1605        assert_eq!(projected.region(), &region);
1606    }
1607
1608    #[test]
1609    fn rle_roundtrip_all_equal() {
1610        let region: Region = extent!(n = 6).into();
1611        let mut vm = ValueMesh::new_unchecked(region.clone(), vec![42; 6]);
1612
1613        // Compress and ensure logical equality preserved.
1614        vm.compress_adjacent_in_place();
1615        let collected: Vec<_> = vm.values().collect();
1616        assert_eq!(collected, vec![42, 42, 42, 42, 42, 42]);
1617
1618        // Random access still works.
1619        for i in 0..region.num_ranks() {
1620            assert_eq!(vm.get(i), Some(&42));
1621        }
1622        assert_eq!(vm.get(region.num_ranks()), None); // out-of-bounds
1623    }
1624
1625    #[test]
1626    fn rle_roundtrip_alternating() {
1627        let region: Region = extent!(n = 6).into();
1628        let original = vec![1, 2, 1, 2, 1, 2];
1629        let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1630
1631        vm.compress_adjacent_in_place();
1632        let collected: Vec<_> = vm.values().collect();
1633        assert_eq!(collected, original);
1634
1635        // Spot-check random access after compression.
1636        assert_eq!(vm.get(0), Some(&1));
1637        assert_eq!(vm.get(1), Some(&2));
1638        assert_eq!(vm.get(3), Some(&2));
1639        assert_eq!(vm.get(5), Some(&2));
1640    }
1641
1642    #[test]
1643    fn rle_roundtrip_blocky_and_slice() {
1644        // Blocks: 0,0,0 | 1,1 | 2,2,2,2 | 3
1645        let region: Region = extent!(n = 10).into();
1646        let original = vec![0, 0, 0, 1, 1, 2, 2, 2, 2, 3];
1647        let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1648
1649        vm.compress_adjacent_in_place();
1650        let collected: Vec<_> = vm.values().collect();
1651        assert_eq!(collected, original);
1652
1653        // Slice a middle subregion [3..8) → [1,1,2,2,2]
1654        let sub_region = region.range("n", 3..8).unwrap();
1655        let sliced = vm.sliced(sub_region);
1656        let sliced_vec: Vec<_> = sliced.values().collect();
1657        assert_eq!(sliced_vec, vec![1, 1, 2, 2, 2]);
1658    }
1659
1660    #[test]
1661    fn rle_idempotent_noop_on_second_call() {
1662        let region: Region = extent!(n = 7).into();
1663        let original = vec![9, 9, 9, 8, 8, 9, 9];
1664        let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1665
1666        vm.compress_adjacent_in_place();
1667        let once: Vec<_> = vm.values().collect();
1668        assert_eq!(once, original);
1669
1670        // Calling again should be a no-op and still yield identical
1671        // values.
1672        vm.compress_adjacent_in_place();
1673        let twice: Vec<_> = vm.values().collect();
1674        assert_eq!(twice, original);
1675    }
1676
1677    #[test]
1678    fn rle_works_after_build_indexed() {
1679        // Build with shuffled pairs, then compress and verify
1680        // semantics.
1681        let region: Region = extent!(x = 2, y = 3).into(); // 6
1682        let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)];
1683        let mut vm = pairs
1684            .into_iter()
1685            .collect_indexed::<ValueMesh<_>>(region.clone())
1686            .unwrap();
1687
1688        // Should compress to 6 runs of length 1; still must
1689        // round-trip.
1690        vm.compress_adjacent_in_place();
1691        let collected: Vec<_> = vm.values().collect();
1692        assert_eq!(collected, vec![0, 10, 20, 30, 40, 50]);
1693        // Spot-check get()
1694        assert_eq!(vm.get(4), Some(&40));
1695    }
1696
1697    #[test]
1698    fn rle_handles_singleton_mesh() {
1699        let region: Region = extent!(n = 1).into();
1700        let mut vm = ValueMesh::new_unchecked(region.clone(), vec![123]);
1701
1702        vm.compress_adjacent_in_place();
1703        let collected: Vec<_> = vm.values().collect();
1704        assert_eq!(collected, vec![123]);
1705        assert_eq!(vm.get(0), Some(&123));
1706        assert_eq!(vm.get(1), None);
1707    }
1708
1709    #[test]
1710    fn test_dense_round_trip_json() {
1711        // Build a simple dense mesh of 5 integers.
1712        let region: Region = extent!(x = 5).into();
1713        let dense = ValueMesh::new(region.clone(), vec![1, 2, 3, 4, 5]).unwrap();
1714
1715        let json = serde_json::to_string_pretty(&dense).unwrap();
1716        let restored: ValueMesh<i32> = serde_json::from_str(&json).unwrap();
1717
1718        assert_eq!(dense, restored);
1719
1720        // Dense meshes should stay dense on the wire: check the
1721        // tagged variant.
1722        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1723        // enum tag is nested: {"rep": {"Dense":{...}}
1724        let tag = v.get("rep").and_then(|o| o.get("Dense"));
1725        assert!(tag.is_some(), "json is {}", json);
1726    }
1727
1728    #[test]
1729    fn test_dense_round_trip_bincode() {
1730        // Build a simple dense mesh of 5 integers.
1731        let region: Region = extent!(x = 5).into();
1732        let dense = ValueMesh::new(region.clone(), vec![1, 2, 3, 4, 5]).unwrap();
1733
1734        let encoded = bincode::serialize(&dense).unwrap();
1735        let restored: ValueMesh<i32> = bincode::deserialize(&encoded).unwrap();
1736
1737        assert_eq!(dense, restored);
1738        // Dense meshes should stay dense on the wire.
1739        assert!(matches!(restored.rep, Rep::Dense { .. }));
1740    }
1741
1742    #[test]
1743    fn test_compressed_round_trip_json() {
1744        // Build a dense mesh, compress it, and verify it stays
1745        // compressed on the wire.
1746        let region: Region = extent!(x = 10).into();
1747        let mut mesh = ValueMesh::new(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 3, 3, 3]).unwrap();
1748        mesh.compress_adjacent_in_place();
1749
1750        let json = serde_json::to_string_pretty(&mesh).unwrap();
1751        let restored: ValueMesh<i32> = serde_json::from_str(&json).unwrap();
1752
1753        // Logical equality preserved.
1754        assert_eq!(mesh, restored);
1755
1756        // Compressed meshes should stay compressed on the wire.
1757        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1758        // enum tag is nested: {"rep": {"Compressed": {...}}
1759        let tag = v.get("rep").and_then(|o| o.get("Compressed"));
1760        assert!(tag.is_some(), "json is {}", json);
1761    }
1762
1763    #[test]
1764    fn test_compressed_round_trip_bincode() {
1765        // Build a dense mesh, compress it, and verify it stays
1766        // compressed on the wire.
1767        let region: Region = extent!(x = 10).into();
1768        let mut mesh = ValueMesh::new(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 3, 3, 3]).unwrap();
1769        mesh.compress_adjacent_in_place();
1770
1771        let encoded = bincode::serialize(&mesh).unwrap();
1772        let restored: ValueMesh<i32> = bincode::deserialize(&encoded).unwrap();
1773
1774        // Logical equality preserved.
1775        assert_eq!(mesh, restored);
1776        // Compressed meshes should stay compressed on the wire.
1777        assert!(matches!(restored.rep, Rep::Compressed { .. }));
1778    }
1779
1780    #[test]
1781    fn test_stable_run_encoding() {
1782        let run = Run::new(0, 10, 42);
1783        let json = serde_json::to_string(&run).unwrap();
1784        let decoded: Run = serde_json::from_str(&json).unwrap();
1785
1786        assert_eq!(run, decoded);
1787        assert_eq!(run.start, 0);
1788        assert_eq!(run.end, 10);
1789        assert_eq!(run.id, 42);
1790
1791        // Ensure conversion back to Range<usize> works.
1792        let (range, id): (Range<usize>, u32) = run.try_into().unwrap();
1793        assert_eq!(range, 0..10);
1794        assert_eq!(id, 42);
1795    }
1796
1797    #[test]
1798    fn from_single_builds_single_run() {
1799        let region: Region = extent!(n = 6).into();
1800        let vm = ValueMesh::from_single(region.clone(), 7);
1801
1802        assert_eq!(vm.region(), &region);
1803        assert_eq!(vm.values().collect::<Vec<_>>(), vec![7, 7, 7, 7, 7, 7]);
1804        assert_eq!(vm.get(0), Some(&7));
1805        assert_eq!(vm.get(5), Some(&7));
1806        assert_eq!(vm.get(6), None);
1807    }
1808
1809    #[test]
1810    fn from_default_builds_with_default_value() {
1811        let region: Region = extent!(n = 6).into();
1812        let vm = ValueMesh::<i32>::from_default(region.clone());
1813
1814        assert_eq!(vm.region(), &region);
1815        // i32::default() == 0
1816        assert_eq!(vm.values().collect::<Vec<_>>(), vec![0, 0, 0, 0, 0, 0]);
1817        assert_eq!(vm.get(0), Some(&0));
1818        assert_eq!(vm.get(5), Some(&0));
1819    }
1820
1821    #[test]
1822    fn test_default_vs_single_equivalence() {
1823        let region: Region = extent!(x = 4).into();
1824        let d1 = ValueMesh::<i32>::from_default(region.clone());
1825        let d2 = ValueMesh::from_single(region.clone(), 0);
1826        assert_eq!(d1, d2);
1827    }
1828
1829    #[test]
1830    fn build_from_ranges_with_default_basic() {
1831        let region: Region = extent!(n = 10).into();
1832        let vm = ValueMesh::from_ranges_with_default(
1833            region.clone(),
1834            0, // default
1835            vec![(2..4, 1), (6..9, 2)],
1836        )
1837        .unwrap();
1838
1839        assert_eq!(vm.region(), &region);
1840        assert_eq!(
1841            vm.values().collect::<Vec<_>>(),
1842            vec![0, 0, 1, 1, 0, 0, 2, 2, 2, 0]
1843        );
1844
1845        // Internal shape: [0..2)->0, [2..4)->1, [4..6)->0, [6..9)->2,
1846        // [9..10)->0
1847        if let Rep::Compressed { table, runs } = &vm.rep {
1848            // Table is small and de-duplicated.
1849            assert!(table.len() <= 3);
1850            assert_eq!(runs.len(), 5);
1851        } else {
1852            panic!("expected compressed");
1853        }
1854    }
1855
1856    #[test]
1857    fn build_from_ranges_with_default_edge_cases() {
1858        let region: Region = extent!(n = 5).into();
1859
1860        // Full override covers entire region.
1861        let vm = ValueMesh::from_ranges_with_default(region.clone(), 9, vec![(0..5, 3)]).unwrap();
1862        assert_eq!(vm.values().collect::<Vec<_>>(), vec![3, 3, 3, 3, 3]);
1863
1864        // Adjacent overrides and default gaps.
1865        let vm = ValueMesh::from_ranges_with_default(region.clone(), 0, vec![(1..2, 7), (2..4, 7)])
1866            .unwrap();
1867        assert_eq!(vm.values().collect::<Vec<_>>(), vec![0, 7, 7, 7, 0]);
1868
1869        // Empty region.
1870        let empty_region: Region = extent!(n = 0).into();
1871        let vm = ValueMesh::from_ranges_with_default(empty_region.clone(), 42, vec![]).unwrap();
1872        assert_eq!(vm.values().collect::<Vec<_>>(), Vec::<i32>::new());
1873    }
1874
1875    #[test]
1876    fn from_dense_builds_and_compresses() {
1877        let region: Region = extent!(n = 6).into();
1878        let mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 2, 2, 3, 3]).unwrap();
1879
1880        assert_eq!(mesh.region(), &region);
1881        assert!(matches!(mesh.rep, Rep::Compressed { .. }));
1882        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![1, 1, 2, 2, 3, 3]);
1883
1884        // Spot-check indexing.
1885        assert_eq!(mesh.get(0), Some(&1));
1886        assert_eq!(mesh.get(3), Some(&2));
1887        assert_eq!(mesh.get(5), Some(&3));
1888    }
1889
1890    #[test]
1891    fn merge_from_overlay_basic() {
1892        // Base mesh with two contiguous runs.
1893        let region: Region = extent!(n = 8).into();
1894        let mut mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3]).unwrap();
1895
1896        // Overlay replaces middle segment [2..6) with 9s.
1897        let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9)]).unwrap();
1898
1899        mesh.merge_from_overlay(overlay).unwrap();
1900
1901        // Materialize back into ranges to inspect.
1902        let out = mesh.materialized_runs();
1903
1904        // Expected: left prefix (0..2)=1, replaced middle (2..6)=9, tail (6..8)=3.
1905        assert_eq!(out, vec![(0..2, 1), (2..6, 9), (6..8, 3)]);
1906    }
1907
1908    #[test]
1909    fn merge_from_overlay_multiple_spans() {
1910        // Build mesh with alternating runs.
1911        let region: Region = extent!(m = 12).into();
1912        let mut mesh =
1913            ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4])
1914                .unwrap();
1915
1916        // Overlay has a run that spans across the boundary of two
1917        // left runs and another disjoint run later.
1918        let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9), (9..11, 8)]).unwrap();
1919
1920        mesh.merge_from_overlay(overlay).unwrap();
1921        let out = mesh.materialized_runs();
1922
1923        // Expected after merge and re-compression:
1924        // (0..2,1) untouched
1925        // (2..6,9) overwrite of part of [1,2] runs
1926        // (6..9,3) left tail survives
1927        // (9..11,8) overwrite inside [4] run
1928        // (11..12,4) leftover tail
1929        assert_eq!(
1930            out,
1931            vec![(0..2, 1), (2..6, 9), (6..9, 3), (9..11, 8), (11..12, 4)]
1932        );
1933    }
1934
1935    #[test]
1936    fn merge_from_overlay_crosses_row_boundary() {
1937        // 2 x 5 region -> 10 linear ranks in row-major order.
1938        let region: Region = extent!(rows = 2, cols = 5).into();
1939
1940        // Dense values laid out row-major:
1941        // row 0: [1, 1, 1, 2, 2]
1942        // row 1: [3, 3, 4, 4, 4]
1943        let mut mesh =
1944            ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 4, 4, 4]).unwrap();
1945
1946        // Overlay that crosses the row boundary:
1947        // linear ranks [3..7) -> 9
1948        //   - tail of row 0: indices 3,4 (the two 2s)
1949        //   - head of row 1: indices 5,6 (the two 3s)
1950        let overlay = ValueOverlay::try_from_runs(vec![(3..7, 9)]).unwrap();
1951
1952        mesh.merge_from_overlay(overlay).unwrap();
1953
1954        // After merge, the dense view should be:
1955        // [1,1,1, 9,9, 9,9, 4,4,4]
1956        let flat: Vec<_> = mesh.values().collect();
1957        assert_eq!(flat, vec![1, 1, 1, 9, 9, 9, 9, 4, 4, 4]);
1958
1959        // And the materialized runs should reflect that:
1960        // (0..3,1) | (3..7,9) | (7..10,4)
1961        let runs = mesh.materialized_runs();
1962        assert_eq!(runs, vec![(0..3, 1), (3..7, 9), (7..10, 4)]);
1963    }
1964
1965    #[test]
1966    fn accumulator_initializes_and_merges_overlays() {
1967        /// Simple three-variant enum local to tests, avoiding a
1968        /// cross-module dependency on `crate::resource::Status`.
1969        #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Named)]
1970        enum Val {
1971            Init,
1972            Running,
1973            Stopped,
1974        }
1975
1976        let region: Region = extent!(m = 8).into();
1977        let template = ValueMesh::from_single(region.clone(), Val::Init);
1978
1979        // State starts empty (default).
1980        let mut state = ValueMesh::<Val>::default();
1981        assert_eq!(state.region().num_ranks(), 0);
1982
1983        // First accumulate: state should be cloned from template.
1984        let overlay1 = ValueOverlay::try_from_runs(vec![(2..5, Val::Running)]).unwrap();
1985        template.accumulate(&mut state, overlay1).unwrap();
1986
1987        // State now has the full region, with ranks 2..5 set to Running.
1988        assert_eq!(state.region().num_ranks(), 8);
1989        let vals: Vec<_> = state.values().collect();
1990        assert_eq!(
1991            vals,
1992            vec![
1993                Val::Init,
1994                Val::Init,
1995                Val::Running,
1996                Val::Running,
1997                Val::Running,
1998                Val::Init,
1999                Val::Init,
2000                Val::Init,
2001            ]
2002        );
2003
2004        // Second accumulate: overlay a different subrange with Stopped.
2005        let overlay2 = ValueOverlay::try_from_runs(vec![(5..7, Val::Stopped)]).unwrap();
2006        template.accumulate(&mut state, overlay2).unwrap();
2007
2008        let vals: Vec<_> = state.values().collect();
2009        assert_eq!(
2010            vals,
2011            vec![
2012                Val::Init,
2013                Val::Init,
2014                Val::Running,
2015                Val::Running,
2016                Val::Running,
2017                Val::Stopped,
2018                Val::Stopped,
2019                Val::Init,
2020            ]
2021        );
2022
2023        // reducer_spec should return Some with the correct typehash.
2024        let spec = template
2025            .reducer_spec()
2026            .expect("reducer_spec should be Some");
2027        assert_eq!(
2028            spec.typehash,
2029            <ValueOverlayReducer<Val> as Named>::typehash()
2030        );
2031    }
2032
2033    #[test]
2034    fn value_overlay_reducer_merges_with_right_wins() {
2035        /// Simple three-variant enum local to tests, avoiding a
2036        /// cross-module dependency on `crate::resource::Status`.
2037        #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Named)]
2038        enum Val {
2039            Init,
2040            Running,
2041            Stopped,
2042        }
2043
2044        let reducer = ValueOverlayReducer::<Val>(PhantomData);
2045
2046        // Left overlay: ranks 0..4 = Running, 6..8 = Stopped
2047        let left =
2048            ValueOverlay::try_from_runs(vec![(0..4, Val::Running), (6..8, Val::Stopped)]).unwrap();
2049
2050        // Right overlay: ranks 2..6 = Init
2051        // Overlaps with left on 2..4 (should overwrite Running).
2052        let right = ValueOverlay::try_from_runs(vec![(2..6, Val::Init)]).unwrap();
2053
2054        let merged = reducer.reduce(left, right).unwrap();
2055        let runs: Vec<_> = merged.runs().cloned().collect();
2056
2057        // Expected (right wins on overlap):
2058        // 0..2 Running (from left, no overlap)
2059        // 2..6 Init (from right, overwrites left 2..4)
2060        // 6..8 Stopped (from left, no overlap)
2061        assert_eq!(
2062            runs,
2063            vec![
2064                (0..2, Val::Running),
2065                (2..6, Val::Init),
2066                (6..8, Val::Stopped),
2067            ]
2068        );
2069    }
2070}