hyperactor_mesh/v1/value_mesh.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::cmp::Ordering;
10use std::collections::HashMap;
11use std::collections::hash_map::Entry;
12use std::hash::Hash;
13use std::marker::PhantomData;
14use std::mem;
15use std::mem::MaybeUninit;
16use std::ops::Range;
17use std::ptr;
18use std::ptr::NonNull;
19
20use futures::Future;
21use hyperactor::Named;
22use hyperactor::accum::Accumulator;
23use hyperactor::accum::CommReducer;
24use hyperactor::accum::ReducerFactory;
25use hyperactor::accum::ReducerSpec;
26use ndslice::extent;
27use ndslice::view;
28use ndslice::view::Ranked;
29use ndslice::view::Region;
30use serde::Deserialize;
31use serde::Serialize;
32
33mod rle;
34mod value_overlay;
35pub use value_overlay::BuildError;
36pub use value_overlay::ValueOverlay;
37
38/// A mesh of values, one per rank in `region`.
39///
40/// The internal representation (`rep`) may be dense or compressed,
41/// but externally the mesh always behaves as a complete mapping from
42/// rank index → value.
43///
44/// # Invariants
45/// - Complete: every rank in `region` has exactly one value.
46/// - Order: iteration and indexing follow the region's linearization.
47#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements
48pub struct ValueMesh<T> {
49 /// The logical multidimensional domain of the mesh.
50 ///
51 /// Determines the number of ranks (`region.num_ranks()`) and the
52 /// order in which they are traversed.
53 region: Region,
54
55 /// The underlying storage representation.
56 ///
57 /// - `Rep::Dense` stores a `Vec<T>` with one value per rank.
58 /// - `Rep::Compressed` stores a run-length encoded table of
59 /// unique values plus `(Range<usize>, u32)` pairs describing
60 /// contiguous runs of identical values.
61 ///
62 /// The representation is an internal optimization detail; all
63 /// public APIs (e.g. `values()`, `get()`, slicing) behave as if
64 /// the mesh were dense.
65 rep: Rep<T>,
66}
67
68/// A single run-length–encoded (RLE) segment within a [`ValueMesh`].
69///
70/// Each `Run` represents a contiguous range of ranks `[start, end)`
71/// that all share the same value, referenced indirectly via a table
72/// index `id`. This allows compact storage of large regions with
73/// repeated values.
74///
75/// Runs are serialized in a stable, portable format using `u64` for
76/// range bounds (`start`, `end`) to avoid platform‐dependent `usize`
77/// encoding differences.
78#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
79struct Run {
80 /// Inclusive start of the contiguous range of ranks (0-based).
81 start: u64,
82 /// Exclusive end of the contiguous range of ranks (0-based).
83 end: u64,
84 /// Index into the value table for this run's shared value.
85 id: u32,
86}
87
88impl Run {
89 /// Creates a new `Run` covering ranks `[start, end)` that all
90 /// share the same table entry `id`.
91 ///
92 /// Converts `usize` bounds to `u64` for stable serialization.
93 fn new(start: usize, end: usize, id: u32) -> Self {
94 Self {
95 start: start as u64,
96 end: end as u64,
97 id,
98 }
99 }
100}
101
102impl TryFrom<Run> for (Range<usize>, u32) {
103 type Error = &'static str;
104
105 /// Converts a serialized [`Run`] back into its in-memory form
106 /// `(Range<usize>, u32)`.
107 ///
108 /// Performs checked conversion of the 64-bit wire fields back
109 /// into `usize` indices, returning an error if either bound
110 /// exceeds the platform's addressable range. This ensures safe
111 /// round-tripping between the serialized wire format and native
112 /// representation.
113 #[allow(clippy::result_large_err)]
114 fn try_from(r: Run) -> Result<Self, Self::Error> {
115 let start = usize::try_from(r.start).map_err(|_| "run.start too large")?;
116 let end = usize::try_from(r.end).map_err(|_| "run.end too large")?;
117 Ok((start..end, r.id))
118 }
119}
120
121/// Internal storage representation for a [`ValueMesh`].
122///
123/// This enum abstracts how the per-rank values are stored.
124/// Externally, both variants behave identically — the difference is
125/// purely in memory layout and access strategy.
126///
127/// - [`Rep::Dense`] stores one value per rank, directly.
128/// - [`Rep::Compressed`] stores a compact run-length-encoded form,
129/// reusing identical values across contiguous ranks.
130///
131/// Users of [`ValueMesh`] normally never interact with `Rep`
132/// directly; all iteration and slicing APIs present a dense logical
133/// view.
134#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements
135#[serde(tag = "rep", rename_all = "snake_case")]
136enum Rep<T> {
137 /// Fully expanded representation: one element per rank.
138 ///
139 /// The length of `values` is always equal to
140 /// `region.num_ranks()`. This form is simple and fast for
141 /// iteration and mutation but uses more memory when large runs of
142 /// repeated values are present.
143 Dense {
144 /// Flat list of values, one per rank in the region's
145 /// linearization order.
146 values: Vec<T>,
147 },
148
149 /// Run-length-encoded representation.
150 ///
151 /// Each run `(Range<usize>, id)` indicates that the ranks within
152 /// `Range` (half-open `[start, end)`) share the same value at
153 /// `table[id]`. The `table` stores each distinct value once.
154 ///
155 /// # Invariants
156 /// - Runs are non-empty and contiguous (`r.start < r.end`).
157 /// - Runs collectively cover `0..region.num_ranks()` with no gaps
158 /// or overlaps.
159 /// - `id` indexes into `table` (`id < table.len()`).
160 Compressed {
161 /// The deduplicated set of unique values referenced by runs.
162 table: Vec<T>,
163
164 /// List of `(range, table_id)` pairs describing contiguous
165 /// runs of identical values in region order.
166 runs: Vec<Run>,
167 },
168}
169
170// At this time, Default is used primarily to satisfy the mailbox
171// Accumulator bound. It constructs an empty (zero-rank) mesh. Other
172// contexts may also use this as a generic "empty mesh" initializer
173// when a concrete region is not yet known.
174impl<T> Default for ValueMesh<T> {
175 fn default() -> Self {
176 Self::empty()
177 }
178}
179
180impl<T> ValueMesh<T> {
181 /// Returns an *empty* mesh: a 1-dimensional region of length 0.
182 ///
183 /// This differs from a *dimensionless* (0-D) region, which
184 /// represents a single scalar element. A zero-length 1-D region
185 /// has **no elements at all**, making it the natural `Default`
186 /// placeholder for accumulator state initialization.
187 pub fn empty() -> Self {
188 // zero-rank region; no constraints on T
189 let region = extent!(r = 0).into();
190 Self::new_unchecked(region, Vec::<T>::new())
191 }
192
193 /// Creates a new `ValueMesh` for `region` with exactly one value
194 /// per rank.
195 ///
196 /// # Invariants
197 /// This constructor validates that the number of provided values
198 /// (`ranks.len()`) matches the region's cardinality
199 /// (`region.num_ranks()`). A value mesh must be complete: every
200 /// rank in `region` has a corresponding `T`.
201 ///
202 /// # Errors
203 /// Returns [`Error::InvalidRankCardinality`] if `ranks.len() !=
204 /// region.num_ranks()`.
205 /// ```
206 #[allow(clippy::result_large_err)]
207 pub(crate) fn new(region: Region, ranks: Vec<T>) -> crate::v1::Result<Self> {
208 let (actual, expected) = (ranks.len(), region.num_ranks());
209 if actual != expected {
210 return Err(crate::v1::Error::InvalidRankCardinality { expected, actual });
211 }
212 Ok(Self {
213 region,
214 rep: Rep::Dense { values: ranks },
215 })
216 }
217
218 /// Constructs a `ValueMesh` without checking cardinality. Caller
219 /// must ensure `ranks.len() == region.num_ranks()`.
220 #[inline]
221 pub(crate) fn new_unchecked(region: Region, ranks: Vec<T>) -> Self {
222 debug_assert_eq!(region.num_ranks(), ranks.len());
223 Self {
224 region,
225 rep: Rep::Dense { values: ranks },
226 }
227 }
228}
229
230impl<T: Clone> ValueMesh<T> {
231 /// Builds a `ValueMesh` that assigns the single value `s` to
232 /// every rank in `region`, without materializing a dense
233 /// `Vec<T>`. The result is stored in compressed (RLE) form as a
234 /// single run `[0..N)`.
235 ///
236 /// If `region.num_ranks() == 0`, the mesh contains no runs (and
237 /// an empty table), regardless of `s`.
238 pub fn from_single(region: Region, s: T) -> Self {
239 let n = region.num_ranks();
240 if n == 0 {
241 return Self {
242 region,
243 rep: Rep::Compressed {
244 table: Vec::new(),
245 runs: Vec::new(),
246 },
247 };
248 }
249
250 let table = vec![s];
251 let runs = vec![Run::new(0, n, 0)];
252 Self {
253 region,
254 rep: Rep::Compressed { table, runs },
255 }
256 }
257}
258
259impl<T: Clone + Default> ValueMesh<T> {
260 /// Builds a [`ValueMesh`] covering the region, filled with
261 /// `T::default()`.
262 ///
263 /// Equivalent to [`ValueMesh::from_single(region,
264 /// T::default())`].
265 pub fn from_default(region: Region) -> Self {
266 ValueMesh::<T>::from_single(region, T::default())
267 }
268}
269
270impl<T: Eq + Hash> ValueMesh<T> {
271 /// Builds a compressed mesh from a default value and a set of
272 /// disjoint ranges that override the default.
273 ///
274 /// - `ranges` may be in any order; they must be non-empty,
275 /// in-bounds, and non-overlapping.
276 /// - Unspecified ranks are filled with `default`.
277 /// - Result is stored in RLE form; no dense `Vec<T>` is
278 /// materialized.
279 #[allow(clippy::result_large_err)]
280 pub fn from_ranges_with_default(
281 region: Region,
282 default: T,
283 mut ranges: Vec<(Range<usize>, T)>,
284 ) -> crate::v1::Result<Self> {
285 let n = region.num_ranks();
286
287 if n == 0 {
288 return Ok(Self {
289 region,
290 rep: Rep::Compressed {
291 table: Vec::new(),
292 runs: Vec::new(),
293 },
294 });
295 }
296
297 // Validate: non-empty, in-bounds; then sort.
298 for (r, _) in &ranges {
299 if r.is_empty() {
300 return Err(crate::v1::Error::InvalidRankCardinality {
301 expected: n,
302 actual: 0,
303 }); // TODO: this surfaces the error but its not a great fit
304 }
305 if r.end > n {
306 return Err(crate::v1::Error::InvalidRankCardinality {
307 expected: n,
308 actual: r.end,
309 });
310 }
311 }
312 ranges.sort_by_key(|(r, _)| (r.start, r.end));
313
314 // Validate: non-overlapping.
315 for w in ranges.windows(2) {
316 let (a, _) = &w[0];
317 let (b, _) = &w[1];
318 if a.end > b.start {
319 // Overlap
320 return Err(crate::v1::Error::InvalidRankCardinality {
321 expected: n,
322 actual: b.start, // TODO: this surfaces the error but is a bad fit
323 });
324 }
325 }
326
327 // Internal index: value -> table id (assigned once).
328 let mut index: HashMap<T, u32> = HashMap::with_capacity(1 + ranges.len());
329 let mut next_id: u32 = 0;
330
331 // Helper: assign or reuse an id by value, taking ownership of
332 // v.
333 let id_of = |v: T, index: &mut HashMap<T, u32>, next_id: &mut u32| -> u32 {
334 match index.entry(v) {
335 Entry::Occupied(o) => *o.get(),
336 Entry::Vacant(vac) => {
337 let id = *next_id;
338 *next_id += 1;
339 vac.insert(id);
340 id
341 }
342 }
343 };
344
345 let default_id = id_of(default, &mut index, &mut next_id);
346
347 let mut runs: Vec<Run> = Vec::with_capacity(1 + 2 * ranges.len());
348 let mut cursor = 0usize;
349
350 for (r, v) in ranges.into_iter() {
351 // Fill default gap if any.
352 if cursor < r.start {
353 runs.push(Run::new(cursor, r.start, default_id));
354 }
355 // Override block.
356 let id = id_of(v, &mut index, &mut next_id);
357 runs.push(Run::new(r.start, r.end, id));
358 cursor = r.end;
359 }
360
361 // Trailing default tail.
362 if cursor < n {
363 runs.push(Run::new(cursor, n, default_id));
364 }
365
366 // Materialize table in id order without cloning: move keys
367 // out of the map.
368 let mut table_slots: Vec<Option<T>> = Vec::new();
369 table_slots.resize_with(next_id as usize, || None);
370
371 for (t, id) in index.into_iter() {
372 table_slots[id as usize] = Some(t);
373 }
374
375 let table: Vec<T> = table_slots
376 .into_iter()
377 .map(|o| o.expect("every id must be assigned"))
378 .collect();
379
380 Ok(Self {
381 region,
382 rep: Rep::Compressed { table, runs },
383 })
384 }
385
386 /// Builds a [`ValueMesh`] from a fully materialized dense vector
387 /// of per-rank values, then compresses it into run-length–encoded
388 /// form if possible.
389 ///
390 /// This constructor is intended for callers that already have one
391 /// value per rank (e.g. computed or received data) but wish to
392 /// store it efficiently.
393 ///
394 /// # Parameters
395 /// - `region`: The logical region describing the mesh's shape and
396 /// rank order.
397 /// - `values`: A dense vector of values, one per rank in
398 /// `region`.
399 ///
400 /// # Returns
401 /// A [`ValueMesh`] whose internal representation is `Compressed`
402 /// if any adjacent elements are equal, or `Dense` if no
403 /// compression was possible.
404 ///
405 /// # Errors
406 /// Returns an error if the number of provided `values` does not
407 /// match the number of ranks in `region`.
408 ///
409 /// # Examples
410 /// ```ignore
411 /// let region: Region = extent!(n = 5).into();
412 /// let mesh = ValueMesh::from_dense(region, vec![1, 1, 2, 2, 3]).unwrap();
413 /// // Internally compressed to three runs: [1, 1], [2, 2], [3]
414 /// ```
415 #[allow(clippy::result_large_err)]
416 pub fn from_dense(region: Region, values: Vec<T>) -> crate::v1::Result<Self> {
417 let mut vm = Self::new(region, values)?;
418 vm.compress_adjacent_in_place();
419 Ok(vm)
420 }
421}
422
423impl<F: Future> ValueMesh<F> {
424 /// Await all futures in the mesh, yielding a `ValueMesh` of their
425 /// outputs.
426 pub async fn join(self) -> ValueMesh<F::Output> {
427 let ValueMesh { region, rep } = self;
428
429 match rep {
430 Rep::Dense { values } => {
431 let results = futures::future::join_all(values).await;
432 ValueMesh::new_unchecked(region, results)
433 }
434 Rep::Compressed { .. } => {
435 unreachable!("join() not implemented for compressed meshes")
436 }
437 }
438 }
439}
440
441impl<T, E> ValueMesh<Result<T, E>> {
442 /// Transposes a `ValueMesh<Result<T, E>>` into a
443 /// `Result<ValueMesh<T>, E>`.
444 pub fn transpose(self) -> Result<ValueMesh<T>, E> {
445 let ValueMesh { region, rep } = self;
446
447 match rep {
448 Rep::Dense { values } => {
449 let values = values.into_iter().collect::<Result<Vec<T>, E>>()?;
450 Ok(ValueMesh::new_unchecked(region, values))
451 }
452 Rep::Compressed { table, runs } => {
453 let table: Vec<T> = table.into_iter().collect::<Result<Vec<T>, E>>()?;
454 Ok(ValueMesh {
455 region,
456 rep: Rep::Compressed { table, runs },
457 })
458 }
459 }
460 }
461}
462
463impl<T: 'static> view::Ranked for ValueMesh<T> {
464 type Item = T;
465
466 /// Returns the region that defines this mesh's shape and rank
467 /// order.
468 fn region(&self) -> &Region {
469 &self.region
470 }
471
472 /// Looks up the value at the given linearized rank.
473 ///
474 /// Works transparently for both dense and compressed
475 /// representations:
476 /// - In the dense case, it simply indexes into the `values`
477 /// vector.
478 /// - In the compressed case, it performs a binary search over run
479 /// boundaries to find which run contains the given rank, then
480 /// returns the corresponding entry from `table`.
481 ///
482 /// Returns `None` if the rank is out of bounds.
483 fn get(&self, rank: usize) -> Option<&Self::Item> {
484 if rank >= self.region.num_ranks() {
485 return None;
486 }
487
488 match &self.rep {
489 Rep::Dense { values } => values.get(rank),
490
491 Rep::Compressed { table, runs } => {
492 let rank = rank as u64;
493
494 // Binary search over runs: find the one whose range
495 // contains `rank`.
496 let idx = runs
497 .binary_search_by(|run| {
498 if run.end <= rank {
499 Ordering::Less
500 } else if run.start > rank {
501 Ordering::Greater
502 } else {
503 Ordering::Equal
504 }
505 })
506 .ok()?;
507
508 // Map the run's table ID to its actual value.
509 let id = runs[idx].id as usize;
510 table.get(id)
511 }
512 }
513 }
514}
515
516impl<T: Clone + 'static> view::RankedSliceable for ValueMesh<T> {
517 fn sliced(&self, region: Region) -> Self {
518 debug_assert!(region.is_subset(self.region()), "sliced: not a subset");
519 let ranks: Vec<T> = self
520 .region()
521 .remap(®ion)
522 .unwrap()
523 .map(|index| self.get(index).unwrap().clone())
524 .collect();
525 debug_assert_eq!(
526 region.num_ranks(),
527 ranks.len(),
528 "sliced: cardinality mismatch"
529 );
530 Self::new_unchecked(region, ranks)
531 }
532}
533
534impl<T> view::BuildFromRegion<T> for ValueMesh<T> {
535 type Error = crate::v1::Error;
536
537 fn build_dense(region: Region, values: Vec<T>) -> Result<Self, Self::Error> {
538 Self::new(region, values)
539 }
540
541 fn build_dense_unchecked(region: Region, values: Vec<T>) -> Self {
542 Self::new_unchecked(region, values)
543 }
544}
545
546impl<T> view::BuildFromRegionIndexed<T> for ValueMesh<T> {
547 type Error = crate::v1::Error;
548
549 fn build_indexed(
550 region: Region,
551 pairs: impl IntoIterator<Item = (usize, T)>,
552 ) -> Result<Self, Self::Error> {
553 let n = region.num_ranks();
554
555 // Allocate uninitialized buffer for T.
556 // Note: Vec<MaybeUninit<T>>'s Drop will only free the
557 // allocation; it never runs T's destructor. We must
558 // explicitly drop any initialized elements (DropGuard) or
559 // convert into Vec<T>.
560 let mut buf: Vec<MaybeUninit<T>> = Vec::with_capacity(n);
561 // SAFETY: set `len = n` to treat the buffer as n uninit slots
562 // of `MaybeUninit<T>`. We never read before `ptr::write`,
563 // drop only slots marked initialized (bitset), and convert to
564 // `Vec<T>` only once all 0..n are initialized (guard enforces
565 // this).
566 unsafe {
567 buf.set_len(n);
568 }
569
570 // Compact bitset for occupancy.
571 let words = n.div_ceil(64);
572 let mut bits = vec![0u64; words];
573 let mut filled = 0usize;
574
575 // Drop guard: cleans up initialized elements on early exit.
576 // Stores raw, non-borrowed pointers (`NonNull`), so we don't
577 // hold Rust references for the whole scope. This allows
578 // mutating `buf`/`bits` inside the loop while still letting
579 // the guard access them if dropped early.
580 struct DropGuard<T> {
581 buf: NonNull<MaybeUninit<T>>,
582 bits: NonNull<u64>,
583 n_elems: usize,
584 n_words: usize,
585 disarm: bool,
586 }
587
588 impl<T> DropGuard<T> {
589 /// # Safety
590 /// - `buf` points to `buf.len()` contiguous
591 /// `MaybeUninit<T>` and outlives the guard.
592 /// - `bits` points to `bits.len()` contiguous `u64` words
593 /// and outlives the guard.
594 /// - For every set bit `i` in `bits`, `buf[i]` has been
595 /// initialized with a valid `T` (and on duplicates, the
596 /// previous `T` at `i` was dropped before overwrite).
597 /// - While the guard is alive, caller may mutate
598 /// `buf`/`bits` (the guard holds only raw pointers; it
599 /// does not create Rust borrows).
600 /// - On drop (when not disarmed), the guard will read
601 /// `bits`, mask tail bits in the final word, and
602 /// `drop_in_place` each initialized `buf[i]`—never
603 /// accessing beyond `buf.len()` / `bits.len()`.
604 /// - If a slice is empty, the stored pointer may be
605 /// `dangling()`; it is never dereferenced when the
606 /// corresponding length is zero.
607 unsafe fn new(buf: &mut [MaybeUninit<T>], bits: &mut [u64]) -> Self {
608 let n_elems = buf.len();
609 let n_words = bits.len();
610 // Invariant typically: n_words == (n_elems + 63) / 64
611 // but we don't *require* it; tail is masked in Drop.
612 Self {
613 buf: NonNull::new(buf.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
614 bits: NonNull::new(bits.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
615 n_elems,
616 n_words,
617 disarm: false,
618 }
619 }
620
621 #[inline]
622 fn disarm(&mut self) {
623 self.disarm = true;
624 }
625 }
626
627 impl<T> Drop for DropGuard<T> {
628 fn drop(&mut self) {
629 if self.disarm {
630 return;
631 }
632
633 // SAFETY:
634 // - `self.buf` points to `n_elems` contiguous
635 // `MaybeUninit<T>` slots (or may be dangling if
636 // `n_elems == 0`); `self.bits` points to `n_words`
637 // contiguous `u64` words (or may be dangling if
638 // `n_words == 0`).
639 // - Loop bounds ensure `w < n_words` when reading
640 // `*bits_base.add(w)`.
641 // - For the final word we mask unused tail bits so
642 // any computed index `i = w * 64 + tz` always
643 // satisfies `i < n_elems` before we dereference
644 // `buf_base.add(i)`.
645 // - Only slots whose bits are set are dropped, so no
646 // double-drops.
647 // - No aliasing with active Rust borrows: the guard
648 // holds raw pointers and runs in `Drop` after the
649 // fill loop.
650 unsafe {
651 let buf_base = self.buf.as_ptr();
652 let bits_base = self.bits.as_ptr();
653
654 for w in 0..self.n_words {
655 // Load word.
656 let mut word = *bits_base.add(w);
657
658 // Mask off bits beyond `n_elems` in the final
659 // word (if any).
660 if w == self.n_words.saturating_sub(1) {
661 let used_bits = self.n_elems.saturating_sub(w * 64);
662 if used_bits < 64 {
663 let mask = if used_bits == 0 {
664 0
665 } else {
666 (1u64 << used_bits) - 1
667 };
668 word &= mask;
669 }
670 }
671
672 // Fast scan set bits.
673 while word != 0 {
674 let tz = word.trailing_zeros() as usize;
675 let i = w * 64 + tz;
676 debug_assert!(i < self.n_elems);
677
678 let slot = buf_base.add(i);
679 // Drop the initialized element.
680 ptr::drop_in_place((*slot).as_mut_ptr());
681
682 // clear the bit we just handled
683 word &= word - 1;
684 }
685 }
686 }
687 }
688 }
689
690 // SAFETY:
691 // - `buf` and `bits` are freshly allocated Vecs with
692 // capacity/len set to cover exactly `n_elems` and `n_words`,
693 // so their `.as_mut_ptr()` is valid for that many elements.
694 // - Both slices live at least as long as the guard, and are
695 // not moved until after the guard is disarmed.
696 // - No aliasing occurs: the guard holds only raw pointers and
697 // the fill loop mutates through those same allocations.
698 let mut guard = unsafe { DropGuard::new(&mut buf, &mut bits) };
699
700 for (rank, value) in pairs {
701 // Single bounds check up front.
702 if rank >= guard.n_elems {
703 return Err(crate::v1::Error::InvalidRankCardinality {
704 expected: guard.n_elems,
705 actual: rank + 1,
706 });
707 }
708
709 // Compute word index and bit mask once.
710 let w = rank / 64;
711 let b = rank % 64;
712 let mask = 1u64 << b;
713
714 // SAFETY:
715 // - `rank < guard.n_elems` was checked above, so
716 // `buf_slot = buf.add(rank)` is within the
717 // `Vec<MaybeUninit<T>>` allocation.
718 // - `w = rank / 64` and `bits.len() == (n + 63) / 64`
719 // ensure `bits_ptr = bits.add(w)` is in-bounds.
720 // - If `(word & mask) != 0`, then this slot was
721 // previously initialized;
722 // `drop_in_place((*buf_slot).as_mut_ptr())` is valid and
723 // leaves the slot uninitialized.
724 // - If the bit was clear, we set it and count `filled +=
725 // 1`.
726 // - `(*buf_slot).write(value)` is valid in both cases:
727 // either writing into an uninitialized slot or
728 // immediately after dropping the prior `T`.
729 // - No aliasing with Rust references: the guard holds raw
730 // pointers and we have exclusive ownership of
731 // `buf`/`bits` within this function.
732 unsafe {
733 // Pointers from the guard (no long-lived & borrows).
734 let bits_ptr = guard.bits.as_ptr().add(w);
735 let buf_slot = guard.buf.as_ptr().add(rank);
736
737 // Read the current word.
738 let word = *bits_ptr;
739
740 if (word & mask) != 0 {
741 // Duplicate: drop old value before overwriting.
742 core::ptr::drop_in_place((*buf_slot).as_mut_ptr());
743 // (Bit already set; no need to set again; don't
744 // bump `filled`.)
745 } else {
746 // First time we see this rank.
747 *bits_ptr = word | mask;
748 filled += 1;
749 }
750
751 // Write new value into the slot.
752 (*buf_slot).write(value);
753 }
754 }
755
756 if filled != n {
757 // Missing ranks: actual = number of distinct ranks seen.
758 return Err(crate::v1::Error::InvalidRankCardinality {
759 expected: n,
760 actual: filled,
761 });
762 }
763
764 // Success: prevent cleanup.
765 guard.disarm();
766
767 // SAFETY: all n slots are initialized
768 let ranks = unsafe {
769 let ptr = buf.as_mut_ptr() as *mut T;
770 let len = buf.len();
771 let cap = buf.capacity();
772 // Prevent `buf` (Vec<MaybeUninit<T>>) from freeing the
773 // allocation. Ownership of the buffer is about to be
774 // transferred to `Vec<T>` via `from_raw_parts`.
775 // Forgetting avoids a double free.
776 mem::forget(buf);
777 Vec::from_raw_parts(ptr, len, cap)
778 };
779
780 Ok(Self::new_unchecked(region, ranks))
781 }
782}
783
784impl<T: PartialEq> ValueMesh<T> {
785 /// Compresses the mesh in place using run-length encoding (RLE).
786 ///
787 /// This method scans the mesh's dense values, coalescing adjacent
788 /// runs of identical elements into a compact [`Rep::Compressed`]
789 /// representation. It replaces the internal storage (`rep`) with
790 /// the compressed form.
791 ///
792 /// # Behavior
793 /// - If the mesh is already compressed, this is a **no-op**.
794 /// - If the mesh is dense, it consumes the current `Vec<T>` and
795 /// rebuilds the representation as a run table plus value table.
796 /// - Only *adjacent* equal values are merged; non-contiguous
797 /// duplicates remain distinct.
798 ///
799 /// # Requirements
800 /// - `T` must implement [`PartialEq`] (to detect equal values).
801 ///
802 /// This operation is lossless: expanding the compressed mesh back
803 /// into a dense vector yields the same sequence of values.
804 pub fn compress_adjacent_in_place(&mut self) {
805 self.compress_adjacent_in_place_by(|a, b| a == b)
806 }
807}
808
809impl<T: Clone + PartialEq> ValueMesh<T> {
810 /// Materializes this mesh into a vector of `(Range<usize>, T)`
811 /// runs.
812 ///
813 /// For a dense representation, this walks the value vector and
814 /// groups adjacent equal values into contiguous runs. The result
815 /// is equivalent to what would be stored in a compressed
816 /// representation, but the mesh itself is not mutated or
817 /// re-encoded. This is purely a read-only view.
818 ///
819 /// For a compressed representation, the stored runs are simply
820 /// cloned.
821 ///
822 /// This method is intended for inspection, testing, and
823 /// diff/merge operations that need a uniform view of value runs
824 /// without changing the underlying representation.
825 fn materialized_runs(&self) -> Vec<(Range<usize>, T)> {
826 match &self.rep {
827 Rep::Dense { values } => {
828 // Coalesce adjacent equals into runs.
829 let mut out = Vec::new();
830 if values.is_empty() {
831 return out;
832 }
833 let mut start = 0usize;
834 for i in 1..values.len() {
835 if values[i] != values[i - 1] {
836 out.push((start..i, values[i - 1].clone()));
837 start = i;
838 }
839 }
840 out.push((start..values.len(), values.last().unwrap().clone()));
841 out
842 }
843 Rep::Compressed { table, runs } => runs
844 .iter()
845 .map(|r| {
846 let id = r.id as usize;
847 ((r.start as usize..r.end as usize), table[id].clone())
848 })
849 .collect(),
850 }
851 }
852}
853
854impl<T: Clone + Eq> ValueMesh<T> {
855 /// Merge a sparse overlay into this mesh.
856 ///
857 /// Overlay segments are applied with **last-writer-wins**
858 /// precedence on overlap (identical to `RankedValues::merge_from`
859 /// behavior). The result is stored compressed.
860 pub fn merge_from_overlay(&mut self, overlay: ValueOverlay<T>) -> Result<(), BuildError> {
861 let n = self.region.num_ranks();
862
863 // Bounds validation (structure already validated by
864 // ValueOverlay).
865 for (r, _) in overlay.runs() {
866 if r.end > n {
867 return Err(BuildError::OutOfBounds {
868 range: r.clone(),
869 region_len: n,
870 });
871 }
872 }
873
874 // Left: current mesh as normalized value-bearing runs.
875 let left = self.materialized_runs();
876 // Right: overlay runs (already sorted, non-overlapping,
877 // coalesced).
878 let right: Vec<(std::ops::Range<usize>, T)> = overlay.runs().cloned().collect();
879
880 // Merge with overlay precedence, reusing the same splitting
881 // strategy as RankedValues::merge_from.
882 let merged = rle::merge_value_runs(left, right);
883
884 // Re-encode to compressed representation:
885 let (table, raw_runs) = rle::rle_from_value_runs(merged);
886 let runs = raw_runs
887 .into_iter()
888 .map(|(r, id)| Run::new(r.start, r.end, id))
889 .collect();
890 self.rep = Rep::Compressed { table, runs };
891
892 Ok(())
893 }
894}
895
896impl<T> ValueMesh<T> {
897 /// Compresses the mesh in place using a custom equivalence
898 /// predicate.
899 ///
900 /// This is a generalized form of [`compress_adjacent_in_place`]
901 /// that merges adjacent values according to an arbitrary
902 /// predicate `same(a, b)`, rather than relying on `PartialEq`.
903 ///
904 /// # Behavior
905 /// - If the mesh is already compressed, this is a **no-op**.
906 /// - Otherwise, consumes the dense `Vec<T>` and replaces it with
907 /// a run-length encoded (`Rep::Compressed`) representation,
908 /// where consecutive elements satisfying `same(a, b)` are
909 /// coalesced into a single run.
910 ///
911 /// # Requirements
912 /// - The predicate must be reflexive and symmetric for
913 /// correctness.
914 ///
915 /// This operation is lossless: expanding the compressed mesh
916 /// reproduces the original sequence exactly under the same
917 /// equivalence.
918 pub fn compress_adjacent_in_place_by<F>(&mut self, same: F)
919 where
920 F: FnMut(&T, &T) -> bool,
921 {
922 let values = match &mut self.rep {
923 Rep::Dense { values } => std::mem::take(values),
924 Rep::Compressed { .. } => return,
925 };
926 let (table, raw_runs) = rle::rle_from_dense(values, same);
927 let runs = raw_runs
928 .into_iter()
929 .map(|(r, id)| Run::new(r.start, r.end, id))
930 .collect();
931 self.rep = Rep::Compressed { table, runs };
932 }
933}
934
935/// Accumulates sparse overlay updates into an authoritative mesh.
936///
937/// Lifecycle:
938/// - Mailbox initializes `State` via `Default` (empty mesh).
939/// - On the first update, the accumulator clones `self` (the template
940/// passed to `open_accum_port_opts`) into `state`. Callers pass a
941/// template such as `StatusMesh::from_single(region, NotExist)`.
942/// - Each overlay update is merged with right-wins semantics via
943/// `merge_from_overlay`, and the accumulator emits a *full*
944/// ValueMesh.
945///
946/// The accumulator's state is a [`ValueMesh<T>`] and its updates are
947/// [`ValueOverlay<T>`] instances. On each update, the overlay’s
948/// normalized runs are merged into the mesh using
949/// [`ValueMesh::merge_from_overlay`] with right-wins semantics.
950///
951/// This enables incremental reduction of sparse updates across
952/// distributed reducers without materializing dense data.
953impl<T> Accumulator for ValueMesh<T>
954where
955 T: Eq + Clone + Named,
956{
957 type State = Self;
958 type Update = ValueOverlay<T>;
959
960 fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
961 // Mailbox starts with A::State::default() (empty). On the
962 // first update, re-initialize to our template (self), which
963 // the caller constructed as "full NotExist over the target
964 // region".
965 if state.region().num_ranks() == 0 {
966 *state = self.clone();
967 }
968
969 // Apply sparse delta into the authoritative mesh
970 // (right-wins).
971 state.merge_from_overlay(update)?;
972 Ok(())
973 }
974
975 fn reducer_spec(&self) -> Option<ReducerSpec> {
976 Some(ReducerSpec {
977 typehash: <ValueOverlayReducer<T> as Named>::typehash(),
978 builder_params: None,
979 })
980 }
981}
982
983/// Marker reducer type for [`ValueOverlay<T>`].
984///
985/// This reducer carries no state; it exists only to bind a concrete
986/// type parameter `T` to the [`CommReducer`] implementation below.
987/// Reduction is purely functional and uses right-wins merge semantics
988/// defined in [`merge_value_runs`].
989#[derive(Named)]
990struct ValueOverlayReducer<T>(std::marker::PhantomData<T>);
991
992/// Reducer for sparse overlay updates.
993///
994/// Combines two [`ValueOverlay<T>`] updates using right-wins
995/// semantics: overlapping runs from `right` overwrite those in
996/// `left`. The merged runs are then normalized and validated via
997/// [`ValueOverlay::try_from_runs`].
998///
999/// Used by the corresponding [`Accumulator`] to perform distributed
1000/// reduction of incremental mesh updates.
1001impl<T> CommReducer for ValueOverlayReducer<T>
1002where
1003 T: Eq + Clone + Named,
1004{
1005 type Update = ValueOverlay<T>;
1006
1007 // Last-writer-wins merge of two sparse overlays.
1008 fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
1009 // 1) Merge runs with right precedence.
1010 let merged = crate::v1::value_mesh::rle::merge_value_runs(
1011 left.runs().cloned().collect(),
1012 right.runs().cloned().collect(),
1013 );
1014 // 2) Re-normalize to an overlay (validates, coalesces).
1015 Ok(ValueOverlay::try_from_runs(merged)?)
1016 }
1017}
1018
1019// register for concrete types:
1020
1021hyperactor::submit! {
1022 ReducerFactory {
1023 typehash_f: <ValueOverlayReducer<crate::resource::Status> as Named>::typehash,
1024 builder_f: |_| Ok(Box::new(ValueOverlayReducer::<crate::resource::Status>(PhantomData))),
1025 }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030 use std::convert::Infallible;
1031 use std::future::Future;
1032 use std::pin::Pin;
1033 use std::task::Context;
1034 use std::task::Poll;
1035 use std::task::RawWaker;
1036 use std::task::RawWakerVTable;
1037 use std::task::Waker;
1038
1039 use futures::executor::block_on;
1040 use futures::future;
1041 use ndslice::extent;
1042 use ndslice::strategy::gen_region;
1043 use ndslice::view::CollectExactMeshExt;
1044 use ndslice::view::CollectIndexedMeshExt;
1045 use ndslice::view::CollectMeshExt;
1046 use ndslice::view::MapIntoExt;
1047 use ndslice::view::Ranked;
1048 use ndslice::view::RankedSliceable;
1049 use ndslice::view::ViewExt;
1050 use proptest::prelude::*;
1051 use proptest::strategy::ValueTree;
1052 use serde_json;
1053
1054 use super::*;
1055
1056 #[test]
1057 fn value_mesh_new_ok() {
1058 let region: Region = extent!(replica = 2, gpu = 3).into();
1059 let mesh = ValueMesh::new(region.clone(), (0..6).collect()).expect("new should succeed");
1060 assert_eq!(mesh.region().num_ranks(), 6);
1061 assert_eq!(mesh.values().count(), 6);
1062 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1063 }
1064
1065 #[test]
1066 fn value_mesh_new_len_mismatch_is_error() {
1067 let region: Region = extent!(replica = 2, gpu = 3).into();
1068 let err = ValueMesh::new(region, vec![0_i32; 5]).unwrap_err();
1069 match err {
1070 crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1071 assert_eq!(expected, 6);
1072 assert_eq!(actual, 5);
1073 }
1074 other => panic!("unexpected error: {other:?}"),
1075 }
1076 }
1077
1078 #[test]
1079 fn value_mesh_transpose_ok_and_err() {
1080 let region: Region = extent!(x = 2).into();
1081
1082 // ok case
1083 let ok_mesh = ValueMesh::new(region.clone(), vec![Ok::<_, Infallible>(1), Ok(2)]).unwrap();
1084 let ok = ok_mesh.transpose().unwrap();
1085 assert_eq!(ok.values().collect::<Vec<_>>(), vec![1, 2]);
1086
1087 // err case: propagate user E
1088 #[derive(Debug, PartialEq)]
1089 enum E {
1090 Boom,
1091 }
1092 let err_mesh = ValueMesh::new(region, vec![Ok(1), Err(E::Boom)]).unwrap();
1093 let err = err_mesh.transpose().unwrap_err();
1094 assert_eq!(err, E::Boom);
1095 }
1096
1097 #[test]
1098 fn value_mesh_join_preserves_region_and_values() {
1099 let region: Region = extent!(x = 2, y = 2).into();
1100 let futs = vec![
1101 future::ready(10),
1102 future::ready(11),
1103 future::ready(12),
1104 future::ready(13),
1105 ];
1106 let mesh = ValueMesh::new(region.clone(), futs).unwrap();
1107
1108 let joined = block_on(mesh.join());
1109 assert_eq!(joined.region().num_ranks(), 4);
1110 assert_eq!(joined.values().collect::<Vec<_>>(), vec![10, 11, 12, 13]);
1111 }
1112
1113 #[test]
1114 fn collect_mesh_ok() {
1115 let region: Region = extent!(x = 2, y = 3).into();
1116 let mesh = (0..6)
1117 .collect_mesh::<ValueMesh<_>>(region.clone())
1118 .expect("collect_mesh should succeed");
1119
1120 assert_eq!(mesh.region().num_ranks(), 6);
1121 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1122 }
1123
1124 #[test]
1125 fn collect_mesh_len_too_short_is_error() {
1126 let region: Region = extent!(x = 2, y = 3).into();
1127 let err = (0..5).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
1128
1129 match err {
1130 crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1131 assert_eq!(expected, 6);
1132 assert_eq!(actual, 5);
1133 }
1134 other => panic!("unexpected error: {other:?}"),
1135 }
1136 }
1137
1138 #[test]
1139 fn collect_mesh_len_too_long_is_error() {
1140 let region: Region = extent!(x = 2, y = 3).into();
1141 let err = (0..7).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
1142 match err {
1143 crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1144 assert_eq!(expected, 6);
1145 assert_eq!(actual, 7);
1146 }
1147 other => panic!("unexpected error: {other:?}"),
1148 }
1149 }
1150
1151 #[test]
1152 fn collect_mesh_from_map_pipeline() {
1153 let region: Region = extent!(x = 2, y = 2).into();
1154 let mesh = (0..4)
1155 .map(|i| i * 10)
1156 .collect_mesh::<ValueMesh<_>>(region.clone())
1157 .unwrap();
1158
1159 assert_eq!(mesh.region().num_ranks(), 4);
1160 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
1161 }
1162
1163 #[test]
1164 fn collect_exact_mesh_ok() {
1165 let region: Region = extent!(x = 2, y = 3).into();
1166 let mesh = (0..6)
1167 .collect_exact_mesh::<ValueMesh<_>>(region.clone())
1168 .expect("collect_exact_mesh should succeed");
1169
1170 assert_eq!(mesh.region().num_ranks(), 6);
1171 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1172 }
1173
1174 #[test]
1175 fn collect_exact_mesh_len_too_short_is_error() {
1176 let region: Region = extent!(x = 2, y = 3).into();
1177 let err = (0..5)
1178 .collect_exact_mesh::<ValueMesh<_>>(region)
1179 .unwrap_err();
1180
1181 match err {
1182 crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1183 assert_eq!(expected, 6);
1184 assert_eq!(actual, 5);
1185 }
1186 other => panic!("unexpected error: {other:?}"),
1187 }
1188 }
1189
1190 #[test]
1191 fn collect_exact_mesh_len_too_long_is_error() {
1192 let region: Region = extent!(x = 2, y = 3).into();
1193 let err = (0..7)
1194 .collect_exact_mesh::<ValueMesh<_>>(region)
1195 .unwrap_err();
1196
1197 match err {
1198 crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1199 assert_eq!(expected, 6);
1200 assert_eq!(actual, 7);
1201 }
1202 other => panic!("unexpected error: {other:?}"),
1203 }
1204 }
1205
1206 #[test]
1207 fn collect_exact_mesh_from_map_pipeline() {
1208 let region: Region = extent!(x = 2, y = 2).into();
1209 let mesh = (0..4)
1210 .map(|i| i * 10)
1211 .collect_exact_mesh::<ValueMesh<_>>(region.clone())
1212 .unwrap();
1213
1214 assert_eq!(mesh.region().num_ranks(), 4);
1215 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
1216 }
1217
1218 #[test]
1219 fn collect_indexed_ok_shuffled() {
1220 let region: Region = extent!(x = 2, y = 3).into();
1221 // (rank, value) in shuffled order; values = rank * 10
1222 let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)];
1223 let mesh = pairs
1224 .into_iter()
1225 .collect_indexed::<ValueMesh<_>>(region.clone())
1226 .unwrap();
1227
1228 assert_eq!(mesh.region().num_ranks(), 6);
1229 assert_eq!(
1230 mesh.values().collect::<Vec<_>>(),
1231 vec![0, 10, 20, 30, 40, 50]
1232 );
1233 }
1234
1235 #[test]
1236 fn collect_indexed_missing_rank_is_error() {
1237 let region: Region = extent!(x = 2, y = 2).into(); // 4
1238 // Missing rank 3
1239 let pairs = vec![(0, 100), (1, 101), (2, 102)];
1240 let err = pairs
1241 .into_iter()
1242 .collect_indexed::<ValueMesh<_>>(region)
1243 .unwrap_err();
1244
1245 match err {
1246 crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1247 assert_eq!(expected, 4);
1248 assert_eq!(actual, 3); // Distinct ranks seen.
1249 }
1250 other => panic!("unexpected error: {other:?}"),
1251 }
1252 }
1253
1254 #[test]
1255 fn collect_indexed_out_of_bounds_is_error() {
1256 let region: Region = extent!(x = 2, y = 2).into(); // 4 (valid ranks 0..=3)
1257 let pairs = vec![(0, 1), (4, 9)]; // 4 is out-of-bounds
1258 let err = pairs
1259 .into_iter()
1260 .collect_indexed::<ValueMesh<_>>(region)
1261 .unwrap_err();
1262
1263 match err {
1264 crate::v1::Error::InvalidRankCardinality { expected, actual } => {
1265 assert_eq!(expected, 4);
1266 assert_eq!(actual, 5); // offending index + 1
1267 }
1268 other => panic!("unexpected error: {other:?}"),
1269 }
1270 }
1271
1272 #[test]
1273 fn collect_indexed_duplicate_last_write_wins() {
1274 let region: Region = extent!(x = 1, y = 3).into(); // 3
1275 // rank 1 appears twice; last value should stick
1276 let pairs = vec![(0, 7), (1, 8), (1, 88), (2, 9)];
1277 let mesh = pairs
1278 .into_iter()
1279 .collect_indexed::<ValueMesh<_>>(region.clone())
1280 .unwrap();
1281
1282 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![7, 88, 9]);
1283 }
1284
1285 // Indexed collector naïve implementation (for reference).
1286 #[allow(clippy::result_large_err)]
1287 fn build_value_mesh_indexed<T>(
1288 region: Region,
1289 pairs: impl IntoIterator<Item = (usize, T)>,
1290 ) -> crate::v1::Result<ValueMesh<T>> {
1291 let n = region.num_ranks();
1292
1293 // Buffer for exactly n slots; fill by rank.
1294 let mut buf: Vec<Option<T>> = std::iter::repeat_with(|| None).take(n).collect();
1295 let mut filled = 0usize;
1296
1297 for (rank, value) in pairs {
1298 if rank >= n {
1299 // Out-of-bounds: report `expected` = n, `actual` =
1300 // offending index + 1; i.e. number of ranks implied
1301 // so far.
1302 return Err(crate::v1::Error::InvalidRankCardinality {
1303 expected: n,
1304 actual: rank + 1,
1305 });
1306 }
1307 if buf[rank].is_none() {
1308 filled += 1;
1309 }
1310 buf[rank] = Some(value); // Last write wins.
1311 }
1312
1313 if filled != n {
1314 // Missing ranks: actual = number of distinct ranks seen.
1315 return Err(crate::v1::Error::InvalidRankCardinality {
1316 expected: n,
1317 actual: filled,
1318 });
1319 }
1320
1321 // All present and in-bounds: unwrap and build unchecked.
1322 let ranks: Vec<T> = buf.into_iter().map(Option::unwrap).collect();
1323 Ok(ValueMesh::new_unchecked(region, ranks))
1324 }
1325
1326 /// This uses the bit-mixing portion of Sebastiano Vigna's
1327 /// [SplitMix64 algorithm](https://prng.di.unimi.it/splitmix64.c)
1328 /// to generate a high-quality 64-bit hash from a usize index.
1329 /// Unlike the full SplitMix64 generator, this is stateless - we
1330 /// accept an arbitrary x as input and apply the mix function to
1331 /// turn `x` deterministically into a "randomized" u64. input
1332 /// always produces the same output.
1333 fn hash_key(x: usize) -> u64 {
1334 let mut z = x as u64 ^ 0x9E3779B97F4A7C15;
1335 z = (z ^ (z >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
1336 z = (z ^ (z >> 27)).wrapping_mul(0x94D049BB133111EB);
1337 z ^ (z >> 31)
1338 }
1339
1340 /// Shuffle a slice deterministically, using a hash of indices as
1341 /// the key.
1342 ///
1343 /// Each position `i` is assigned a pseudo-random 64-bit key (from
1344 /// `key(i)`), the slice is sorted by those keys, and the
1345 /// resulting permutation is applied in place.
1346 ///
1347 /// The permutation is fully determined by the sequence of indices
1348 /// `0..n` and the chosen `key` function. Running it twice on the
1349 /// same input yields the same "random-looking" arrangement.
1350 ///
1351 /// This is going to be used (below) for property tests: it gives
1352 /// the effect of a shuffle without introducing global RNG state,
1353 /// and ensures that duplicate elements are still ordered
1354 /// consistently (so we can test "last write wins" semantics in
1355 /// collectors).
1356 fn pseudo_shuffle<'a, T: 'a>(v: &'a mut [T], key: impl Fn(usize) -> u64 + Copy) {
1357 // Build perm.
1358 let mut with_keys: Vec<(u64, usize)> = (0..v.len()).map(|i| (key(i), i)).collect();
1359 with_keys.sort_by_key(|&(k, _)| k);
1360 let perm: Vec<usize> = with_keys.into_iter().map(|(_, i)| i).collect();
1361
1362 // In-place permutation using a cycle based approach (e.g.
1363 // https://www.geeksforgeeks.org/dsa/permute-the-elements-of-an-array-following-given-order/).
1364 let mut seen = vec![false; v.len()];
1365 for i in 0..v.len() {
1366 if seen[i] {
1367 continue;
1368 }
1369 let mut a = i;
1370 while !seen[a] {
1371 seen[a] = true;
1372 let b = perm[a];
1373 // Short circuit on the cycle's start index.
1374 if b == i {
1375 break;
1376 }
1377 v.swap(a, b);
1378 a = b;
1379 }
1380 }
1381 }
1382
1383 // Property: Optimized and reference collectors yield the same
1384 // `ValueMesh` on complete inputs, even with duplicates.
1385 //
1386 // - Begin with a complete set of `(rank, value)` pairs covering
1387 // all ranks of the region.
1388 // - Add extra pairs at arbitrary ranks (up to `extra_len`), which
1389 // necessarily duplicate existing entries when `extra_len > 0`.
1390 // - Shuffle the combined pairs deterministically.
1391 // - Collect using both the reference (`try_collect_indexed`) and
1392 // optimized (`try_collect_indexed_opt`) implementations.
1393 //
1394 // Both collectors must succeed and produce identical results.
1395 // This demonstrates that the optimized version preserves
1396 // last-write-wins semantics and agrees exactly with the reference
1397 // behavior.
1398 proptest! {
1399 #[test]
1400 fn try_collect_opt_equivalence(region in gen_region(1..=4, 6), extra_len in 0usize..=12) {
1401 let n = region.num_ranks();
1402
1403 // Start with one pair per rank (coverage guaranteed).
1404 let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
1405
1406 // Add some extra duplicates of random in-bounds ranks.
1407 // Their values differ so last-write-wins is observable.
1408 let extras = proptest::collection::vec(0..n, extra_len)
1409 .new_tree(&mut proptest::test_runner::TestRunner::default())
1410 .unwrap()
1411 .current();
1412 for (k, r) in extras.into_iter().enumerate() {
1413 pairs.push((r, (n as i64) + (k as i64)));
1414 }
1415
1416 // Deterministic "shuffle" to fix iteration order across
1417 // both collectors.
1418 pseudo_shuffle(&mut pairs, hash_key);
1419
1420 // Reference vs optimized.
1421 let mesh_ref = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap();
1422 let mesh_opt = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region.clone()).unwrap();
1423
1424 prop_assert_eq!(mesh_ref.region(), mesh_opt.region());
1425 prop_assert_eq!(mesh_ref.values().collect::<Vec<_>>(), mesh_opt.values().collect::<Vec<_>>());
1426 }
1427 }
1428
1429 // Property: Optimized and reference collectors report identical
1430 // errors when ranks are missing.
1431 //
1432 // - Begin with a complete set of `(rank, value)` pairs.
1433 // - Remove one rank so coverage is incomplete.
1434 // - Shuffle deterministically.
1435 // - Collect with both implementations.
1436 //
1437 // Both must fail with `InvalidRankCardinality` describing the
1438 // same expected vs. actual counts.
1439 proptest! {
1440 #[test]
1441 fn try_collect_opt_missing_rank_errors_match(region in gen_region(1..=4, 6)) {
1442 let n = region.num_ranks();
1443 // Base complete.
1444 let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
1445 // Drop one distinct rank.
1446 if n > 0 {
1447 let drop_idx = 0usize; // Deterministic, fine for the property.
1448 pairs.remove(drop_idx);
1449 }
1450 // Shuffle deterministically.
1451 pseudo_shuffle(&mut pairs, hash_key);
1452
1453 let ref_err = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
1454 let opt_err = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
1455 assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
1456 }
1457 }
1458
1459 // Property: Optimized and reference collectors report identical
1460 // errors when given out-of-bounds ranks.
1461 //
1462 // - Construct a set of `(rank, value)` pairs.
1463 // - Include at least one pair whose rank is ≥
1464 // `region.num_ranks()`.
1465 // - Shuffle deterministically.
1466 // - Collect with both implementations.
1467 //
1468 // Both must fail with `InvalidRankCardinality`, and the reported
1469 // error values must match exactly.
1470 proptest! {
1471 #[test]
1472 fn try_collect_opt_out_of_bound_errors_match(region in gen_region(1..=4, 6)) {
1473 let n = region.num_ranks();
1474 // One valid, then one out-of-bound.
1475 let mut pairs = vec![(0usize, 0i64), (n, 123i64)];
1476 pseudo_shuffle(&mut pairs, hash_key);
1477
1478 let ref_err = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
1479 let opt_err = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
1480 assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
1481 }
1482 }
1483
1484 #[test]
1485 fn map_into_preserves_region_and_order() {
1486 let region: Region = extent!(rows = 2, cols = 3).into();
1487 let vm = ValueMesh::new_unchecked(region.clone(), vec![0, 1, 2, 3, 4, 5]);
1488
1489 let doubled: ValueMesh<_> = vm.map_into(|x| x * 2);
1490 assert_eq!(doubled.region, region);
1491 assert_eq!(
1492 doubled.values().collect::<Vec<_>>(),
1493 vec![0, 2, 4, 6, 8, 10]
1494 );
1495 }
1496
1497 #[test]
1498 fn map_into_ref_borrows_and_preserves() {
1499 let region: Region = extent!(n = 4).into();
1500 let vm = ValueMesh::new_unchecked(
1501 region.clone(),
1502 vec!["a".to_string(), "b".into(), "c".into(), "d".into()],
1503 );
1504
1505 let lens: ValueMesh<_> = vm.map_into(|s| s.len());
1506 assert_eq!(lens.region, region);
1507 assert_eq!(lens.values().collect::<Vec<_>>(), vec![1, 1, 1, 1]);
1508 }
1509
1510 #[test]
1511 fn try_map_into_short_circuits_on_error() {
1512 let region = extent!(n = 4).into();
1513 let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
1514
1515 let res: Result<ValueMesh<i32>, &'static str> =
1516 vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
1517
1518 assert!(res.is_err());
1519 assert_eq!(res.unwrap_err(), "boom");
1520 }
1521
1522 #[test]
1523 fn try_map_into_ref_short_circuits_on_error() {
1524 let region = extent!(n = 4).into();
1525 let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
1526
1527 let res: Result<ValueMesh<i32>, &'static str> =
1528 vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
1529
1530 assert!(res.is_err());
1531 assert_eq!(res.unwrap_err(), "boom");
1532 }
1533
1534 // -- Helper to poll `core::future::Ready` without a runtime
1535 fn noop_waker() -> Waker {
1536 fn clone(_: *const ()) -> RawWaker {
1537 RawWaker::new(std::ptr::null(), &VTABLE)
1538 }
1539 fn wake(_: *const ()) {}
1540 fn wake_by_ref(_: *const ()) {}
1541 fn drop(_: *const ()) {}
1542 static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
1543 // SAFETY: The raw waker never dereferences its data pointer
1544 // (`null`), and all vtable fns are no-ops. It's only used to
1545 // satisfy `Context` for polling already-ready futures in
1546 // tests.
1547 unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
1548 }
1549
1550 fn poll_now<F: Future>(mut fut: F) -> F::Output {
1551 let waker = noop_waker();
1552 let mut cx = Context::from_waker(&waker);
1553 // SAFETY: `fut` is a local stack variable that we never move
1554 // after pinning, and we only use it to poll immediately
1555 // within this scope. This satisfies the invariants of
1556 // `Pin::new_unchecked`.
1557 let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
1558 match fut.as_mut().poll(&mut cx) {
1559 Poll::Ready(v) => v,
1560 Poll::Pending => unreachable!("Ready futures must complete immediately"),
1561 }
1562 }
1563 // --
1564
1565 #[test]
1566 fn map_into_ready_futures() {
1567 let region: Region = extent!(r = 2, c = 2).into();
1568 let vm = ValueMesh::new_unchecked(region.clone(), vec![10, 20, 30, 40]);
1569
1570 // Map to `core::future::Ready` futures.
1571 let pending: ValueMesh<core::future::Ready<_>> =
1572 vm.map_into(|x| core::future::ready(x + 1));
1573 assert_eq!(pending.region, region);
1574
1575 // Drive the ready futures without a runtime and collect results.
1576 let results: Vec<_> = pending.values().map(|f| poll_now(f.clone())).collect();
1577 assert_eq!(results, vec![11, 21, 31, 41]);
1578 }
1579
1580 #[test]
1581 fn map_into_single_element_mesh() {
1582 let region: Region = extent!(n = 1).into();
1583 let vm = ValueMesh::new_unchecked(region.clone(), vec![7]);
1584
1585 let out: ValueMesh<_> = vm.map_into(|x| x * x);
1586 assert_eq!(out.region, region);
1587 assert_eq!(out.values().collect::<Vec<_>>(), vec![49]);
1588 }
1589
1590 #[test]
1591 fn map_into_ref_with_non_clone_field() {
1592 // A type that intentionally does NOT implement Clone.
1593 #[derive(Debug, PartialEq, Eq)]
1594 struct NotClone(i32);
1595
1596 let region: Region = extent!(x = 3).into();
1597 let values = vec![(10, NotClone(1)), (20, NotClone(2)), (30, NotClone(3))];
1598 let mesh: ValueMesh<(i32, NotClone)> =
1599 values.into_iter().collect_mesh(region.clone()).unwrap();
1600
1601 let projected: ValueMesh<i32> = mesh.map_into(|t| t.0);
1602 assert_eq!(projected.values().collect::<Vec<_>>(), vec![10, 20, 30]);
1603 assert_eq!(projected.region(), ®ion);
1604 }
1605
1606 #[test]
1607 fn rle_roundtrip_all_equal() {
1608 let region: Region = extent!(n = 6).into();
1609 let mut vm = ValueMesh::new_unchecked(region.clone(), vec![42; 6]);
1610
1611 // Compress and ensure logical equality preserved.
1612 vm.compress_adjacent_in_place();
1613 let collected: Vec<_> = vm.values().collect();
1614 assert_eq!(collected, vec![42, 42, 42, 42, 42, 42]);
1615
1616 // Random access still works.
1617 for i in 0..region.num_ranks() {
1618 assert_eq!(vm.get(i), Some(&42));
1619 }
1620 assert_eq!(vm.get(region.num_ranks()), None); // out-of-bounds
1621 }
1622
1623 #[test]
1624 fn rle_roundtrip_alternating() {
1625 let region: Region = extent!(n = 6).into();
1626 let original = vec![1, 2, 1, 2, 1, 2];
1627 let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1628
1629 vm.compress_adjacent_in_place();
1630 let collected: Vec<_> = vm.values().collect();
1631 assert_eq!(collected, original);
1632
1633 // Spot-check random access after compression.
1634 assert_eq!(vm.get(0), Some(&1));
1635 assert_eq!(vm.get(1), Some(&2));
1636 assert_eq!(vm.get(3), Some(&2));
1637 assert_eq!(vm.get(5), Some(&2));
1638 }
1639
1640 #[test]
1641 fn rle_roundtrip_blocky_and_slice() {
1642 // Blocks: 0,0,0 | 1,1 | 2,2,2,2 | 3
1643 let region: Region = extent!(n = 10).into();
1644 let original = vec![0, 0, 0, 1, 1, 2, 2, 2, 2, 3];
1645 let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1646
1647 vm.compress_adjacent_in_place();
1648 let collected: Vec<_> = vm.values().collect();
1649 assert_eq!(collected, original);
1650
1651 // Slice a middle subregion [3..8) → [1,1,2,2,2]
1652 let sub_region = region.range("n", 3..8).unwrap();
1653 let sliced = vm.sliced(sub_region);
1654 let sliced_vec: Vec<_> = sliced.values().collect();
1655 assert_eq!(sliced_vec, vec![1, 1, 2, 2, 2]);
1656 }
1657
1658 #[test]
1659 fn rle_idempotent_noop_on_second_call() {
1660 let region: Region = extent!(n = 7).into();
1661 let original = vec![9, 9, 9, 8, 8, 9, 9];
1662 let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1663
1664 vm.compress_adjacent_in_place();
1665 let once: Vec<_> = vm.values().collect();
1666 assert_eq!(once, original);
1667
1668 // Calling again should be a no-op and still yield identical
1669 // values.
1670 vm.compress_adjacent_in_place();
1671 let twice: Vec<_> = vm.values().collect();
1672 assert_eq!(twice, original);
1673 }
1674
1675 #[test]
1676 fn rle_works_after_build_indexed() {
1677 // Build with shuffled pairs, then compress and verify
1678 // semantics.
1679 let region: Region = extent!(x = 2, y = 3).into(); // 6
1680 let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)];
1681 let mut vm = pairs
1682 .into_iter()
1683 .collect_indexed::<ValueMesh<_>>(region.clone())
1684 .unwrap();
1685
1686 // Should compress to 6 runs of length 1; still must
1687 // round-trip.
1688 vm.compress_adjacent_in_place();
1689 let collected: Vec<_> = vm.values().collect();
1690 assert_eq!(collected, vec![0, 10, 20, 30, 40, 50]);
1691 // Spot-check get()
1692 assert_eq!(vm.get(4), Some(&40));
1693 }
1694
1695 #[test]
1696 fn rle_handles_singleton_mesh() {
1697 let region: Region = extent!(n = 1).into();
1698 let mut vm = ValueMesh::new_unchecked(region.clone(), vec![123]);
1699
1700 vm.compress_adjacent_in_place();
1701 let collected: Vec<_> = vm.values().collect();
1702 assert_eq!(collected, vec![123]);
1703 assert_eq!(vm.get(0), Some(&123));
1704 assert_eq!(vm.get(1), None);
1705 }
1706
1707 #[test]
1708 fn test_dense_round_trip() {
1709 // Build a simple dense mesh of 5 integers.
1710 let region: Region = extent!(x = 5).into();
1711 let dense = ValueMesh::new(region.clone(), vec![1, 2, 3, 4, 5]).unwrap();
1712
1713 let json = serde_json::to_string_pretty(&dense).unwrap();
1714 let restored: ValueMesh<i32> = serde_json::from_str(&json).unwrap();
1715
1716 assert_eq!(dense, restored);
1717
1718 // Dense meshes should stay dense on the wire: check the
1719 // tagged variant.
1720 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1721 // enum tag is nested: {"rep": {"rep":"dense", ...}}
1722 let tag = v
1723 .get("rep")
1724 .and_then(|o| o.get("rep"))
1725 .and_then(|s| s.as_str());
1726 assert_eq!(tag, Some("dense"));
1727 }
1728
1729 #[test]
1730 fn test_compressed_round_trip() {
1731 // Build a dense mesh, compress it, and verify it stays
1732 // compressed on the wire.
1733 let region: Region = extent!(x = 10).into();
1734 let mut mesh = ValueMesh::new(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 3, 3, 3]).unwrap();
1735 mesh.compress_adjacent_in_place();
1736
1737 let json = serde_json::to_string_pretty(&mesh).unwrap();
1738 let restored: ValueMesh<i32> = serde_json::from_str(&json).unwrap();
1739
1740 // Logical equality preserved.
1741 assert_eq!(mesh, restored);
1742
1743 // Compressed meshes should stay compressed on the wire.
1744 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1745 // enum tag is nested: {"rep": {"rep":"compressed", ...}}
1746 let tag = v
1747 .get("rep")
1748 .and_then(|o| o.get("rep"))
1749 .and_then(|s| s.as_str());
1750 assert_eq!(tag, Some("compressed"));
1751 }
1752
1753 #[test]
1754 fn test_stable_run_encoding() {
1755 let run = Run::new(0, 10, 42);
1756 let json = serde_json::to_string(&run).unwrap();
1757 let decoded: Run = serde_json::from_str(&json).unwrap();
1758
1759 assert_eq!(run, decoded);
1760 assert_eq!(run.start, 0);
1761 assert_eq!(run.end, 10);
1762 assert_eq!(run.id, 42);
1763
1764 // Ensure conversion back to Range<usize> works.
1765 let (range, id): (Range<usize>, u32) = run.try_into().unwrap();
1766 assert_eq!(range, 0..10);
1767 assert_eq!(id, 42);
1768 }
1769
1770 #[test]
1771 fn from_single_builds_single_run() {
1772 let region: Region = extent!(n = 6).into();
1773 let vm = ValueMesh::from_single(region.clone(), 7);
1774
1775 assert_eq!(vm.region(), ®ion);
1776 assert_eq!(vm.values().collect::<Vec<_>>(), vec![7, 7, 7, 7, 7, 7]);
1777 assert_eq!(vm.get(0), Some(&7));
1778 assert_eq!(vm.get(5), Some(&7));
1779 assert_eq!(vm.get(6), None);
1780 }
1781
1782 #[test]
1783 fn from_default_builds_with_default_value() {
1784 let region: Region = extent!(n = 6).into();
1785 let vm = ValueMesh::<i32>::from_default(region.clone());
1786
1787 assert_eq!(vm.region(), ®ion);
1788 // i32::default() == 0
1789 assert_eq!(vm.values().collect::<Vec<_>>(), vec![0, 0, 0, 0, 0, 0]);
1790 assert_eq!(vm.get(0), Some(&0));
1791 assert_eq!(vm.get(5), Some(&0));
1792 }
1793
1794 #[test]
1795 fn test_default_vs_single_equivalence() {
1796 let region: Region = extent!(x = 4).into();
1797 let d1 = ValueMesh::<i32>::from_default(region.clone());
1798 let d2 = ValueMesh::from_single(region.clone(), 0);
1799 assert_eq!(d1, d2);
1800 }
1801
1802 #[test]
1803 fn build_from_ranges_with_default_basic() {
1804 let region: Region = extent!(n = 10).into();
1805 let vm = ValueMesh::from_ranges_with_default(
1806 region.clone(),
1807 0, // default
1808 vec![(2..4, 1), (6..9, 2)],
1809 )
1810 .unwrap();
1811
1812 assert_eq!(vm.region(), ®ion);
1813 assert_eq!(
1814 vm.values().collect::<Vec<_>>(),
1815 vec![0, 0, 1, 1, 0, 0, 2, 2, 2, 0]
1816 );
1817
1818 // Internal shape: [0..2)->0, [2..4)->1, [4..6)->0, [6..9)->2,
1819 // [9..10)->0
1820 if let Rep::Compressed { table, runs } = &vm.rep {
1821 // Table is small and de-duplicated.
1822 assert!(table.len() <= 3);
1823 assert_eq!(runs.len(), 5);
1824 } else {
1825 panic!("expected compressed");
1826 }
1827 }
1828
1829 #[test]
1830 fn build_from_ranges_with_default_edge_cases() {
1831 let region: Region = extent!(n = 5).into();
1832
1833 // Full override covers entire region.
1834 let vm = ValueMesh::from_ranges_with_default(region.clone(), 9, vec![(0..5, 3)]).unwrap();
1835 assert_eq!(vm.values().collect::<Vec<_>>(), vec![3, 3, 3, 3, 3]);
1836
1837 // Adjacent overrides and default gaps.
1838 let vm = ValueMesh::from_ranges_with_default(region.clone(), 0, vec![(1..2, 7), (2..4, 7)])
1839 .unwrap();
1840 assert_eq!(vm.values().collect::<Vec<_>>(), vec![0, 7, 7, 7, 0]);
1841
1842 // Empty region.
1843 let empty_region: Region = extent!(n = 0).into();
1844 let vm = ValueMesh::from_ranges_with_default(empty_region.clone(), 42, vec![]).unwrap();
1845 assert_eq!(vm.values().collect::<Vec<_>>(), Vec::<i32>::new());
1846 }
1847
1848 #[test]
1849 fn from_dense_builds_and_compresses() {
1850 let region: Region = extent!(n = 6).into();
1851 let mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 2, 2, 3, 3]).unwrap();
1852
1853 assert_eq!(mesh.region(), ®ion);
1854 assert!(matches!(mesh.rep, Rep::Compressed { .. }));
1855 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![1, 1, 2, 2, 3, 3]);
1856
1857 // Spot-check indexing.
1858 assert_eq!(mesh.get(0), Some(&1));
1859 assert_eq!(mesh.get(3), Some(&2));
1860 assert_eq!(mesh.get(5), Some(&3));
1861 }
1862
1863 #[test]
1864 fn merge_from_overlay_basic() {
1865 // Base mesh with two contiguous runs.
1866 let region: Region = extent!(n = 8).into();
1867 let mut mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3]).unwrap();
1868
1869 // Overlay replaces middle segment [2..6) with 9s.
1870 let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9)]).unwrap();
1871
1872 mesh.merge_from_overlay(overlay).unwrap();
1873
1874 // Materialize back into ranges to inspect.
1875 let out = mesh.materialized_runs();
1876
1877 // Expected: left prefix (0..2)=1, replaced middle (2..6)=9, tail (6..8)=3.
1878 assert_eq!(out, vec![(0..2, 1), (2..6, 9), (6..8, 3)]);
1879 }
1880
1881 #[test]
1882 fn merge_from_overlay_multiple_spans() {
1883 // Build mesh with alternating runs.
1884 let region: Region = extent!(m = 12).into();
1885 let mut mesh =
1886 ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4])
1887 .unwrap();
1888
1889 // Overlay has a run that spans across the boundary of two
1890 // left runs and another disjoint run later.
1891 let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9), (9..11, 8)]).unwrap();
1892
1893 mesh.merge_from_overlay(overlay).unwrap();
1894 let out = mesh.materialized_runs();
1895
1896 // Expected after merge and re-compression:
1897 // (0..2,1) untouched
1898 // (2..6,9) overwrite of part of [1,2] runs
1899 // (6..9,3) left tail survives
1900 // (9..11,8) overwrite inside [4] run
1901 // (11..12,4) leftover tail
1902 assert_eq!(
1903 out,
1904 vec![(0..2, 1), (2..6, 9), (6..9, 3), (9..11, 8), (11..12, 4)]
1905 );
1906 }
1907
1908 #[test]
1909 fn merge_from_overlay_crosses_row_boundary() {
1910 // 2 x 5 region -> 10 linear ranks in row-major order.
1911 let region: Region = extent!(rows = 2, cols = 5).into();
1912
1913 // Dense values laid out row-major:
1914 // row 0: [1, 1, 1, 2, 2]
1915 // row 1: [3, 3, 4, 4, 4]
1916 let mut mesh =
1917 ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 4, 4, 4]).unwrap();
1918
1919 // Overlay that crosses the row boundary:
1920 // linear ranks [3..7) -> 9
1921 // - tail of row 0: indices 3,4 (the two 2s)
1922 // - head of row 1: indices 5,6 (the two 3s)
1923 let overlay = ValueOverlay::try_from_runs(vec![(3..7, 9)]).unwrap();
1924
1925 mesh.merge_from_overlay(overlay).unwrap();
1926
1927 // After merge, the dense view should be:
1928 // [1,1,1, 9,9, 9,9, 4,4,4]
1929 let flat: Vec<_> = mesh.values().collect();
1930 assert_eq!(flat, vec![1, 1, 1, 9, 9, 9, 9, 4, 4, 4]);
1931
1932 // And the materialized runs should reflect that:
1933 // (0..3,1) | (3..7,9) | (7..10,4)
1934 let runs = mesh.materialized_runs();
1935 assert_eq!(runs, vec![(0..3, 1), (3..7, 9), (7..10, 4)]);
1936 }
1937}