hyperactor_mesh/v1/
value_mesh.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::mem;
10use std::mem::MaybeUninit;
11use std::ptr;
12use std::ptr::NonNull;
13
14use futures::Future;
15use ndslice::view;
16use ndslice::view::Ranked;
17use ndslice::view::Region;
18
19/// A mesh of values, where each value is associated with a rank.
20///
21/// # Invariant
22/// The mesh is *complete*: `ranks.len()` always equals
23/// `region.num_ranks()`. Every rank in the region has exactly one
24/// associated value.
25#[derive(Clone, Debug, PartialEq, Eq, Hash)] // only if T implements
26pub struct ValueMesh<T> {
27    region: Region,
28    ranks: Vec<T>,
29}
30
31impl<T> ValueMesh<T> {
32    /// Creates a new `ValueMesh` for `region` with exactly one value
33    /// per rank.
34    ///
35    /// # Invariants
36    /// This constructor validates that the number of provided values
37    /// (`ranks.len()`) matches the region’s cardinality
38    /// (`region.num_ranks()`). A value mesh must be complete: every
39    /// rank in `region` has a corresponding `T`.
40    ///
41    /// # Errors
42    /// Returns [`Error::InvalidRankCardinality`] if `ranks.len() !=
43    /// region.num_ranks()`.
44    /// ```
45    pub(crate) fn new(region: Region, ranks: Vec<T>) -> crate::v1::Result<Self> {
46        let (actual, expected) = (ranks.len(), region.num_ranks());
47        if actual != expected {
48            return Err(crate::v1::Error::InvalidRankCardinality { expected, actual });
49        }
50        Ok(Self { region, ranks })
51    }
52
53    /// Constructs a `ValueMesh` without checking cardinality. Caller
54    /// must ensure `ranks.len() == region.num_ranks()`.
55    #[inline]
56    pub(crate) fn new_unchecked(region: Region, ranks: Vec<T>) -> Self {
57        debug_assert_eq!(region.num_ranks(), ranks.len());
58        Self { region, ranks }
59    }
60}
61
62impl<F: Future> ValueMesh<F> {
63    /// Await all futures in the mesh, yielding a `ValueMesh` of their
64    /// outputs.
65    pub async fn join(self) -> ValueMesh<F::Output> {
66        let ValueMesh { region, ranks } = self;
67        ValueMesh::new_unchecked(region, futures::future::join_all(ranks).await)
68    }
69}
70
71impl<T, E> ValueMesh<Result<T, E>> {
72    /// Transposes a `ValueMesh<Result<T, E>>` into a
73    /// `Result<ValueMesh<T>, E>`.
74    pub fn transpose(self) -> Result<ValueMesh<T>, E> {
75        let ValueMesh { region, ranks } = self;
76        let ranks = ranks.into_iter().collect::<Result<Vec<T>, E>>()?;
77        Ok(ValueMesh::new_unchecked(region, ranks))
78    }
79}
80
81impl<T: 'static> view::Ranked for ValueMesh<T> {
82    type Item = T;
83
84    fn region(&self) -> &Region {
85        &self.region
86    }
87
88    fn get(&self, rank: usize) -> Option<&Self::Item> {
89        self.ranks.get(rank)
90    }
91}
92
93impl<T: Clone + 'static> view::RankedSliceable for ValueMesh<T> {
94    fn sliced(&self, region: Region) -> Self {
95        debug_assert!(region.is_subset(self.region()), "sliced: not a subset");
96        let ranks: Vec<T> = self
97            .region()
98            .remap(&region)
99            .unwrap()
100            .map(|index| self.get(index).unwrap().clone())
101            .collect();
102        debug_assert_eq!(
103            region.num_ranks(),
104            ranks.len(),
105            "sliced: cardinality mismatch"
106        );
107        Self::new_unchecked(region, ranks)
108    }
109}
110
111impl<T> view::BuildFromRegion<T> for ValueMesh<T> {
112    type Error = crate::v1::Error;
113
114    fn build_dense(region: Region, values: Vec<T>) -> Result<Self, Self::Error> {
115        Self::new(region, values)
116    }
117
118    fn build_dense_unchecked(region: Region, values: Vec<T>) -> Self {
119        Self::new_unchecked(region, values)
120    }
121}
122
123impl<T> view::BuildFromRegionIndexed<T> for ValueMesh<T> {
124    type Error = crate::v1::Error;
125
126    fn build_indexed(
127        region: Region,
128        pairs: impl IntoIterator<Item = (usize, T)>,
129    ) -> Result<Self, Self::Error> {
130        let n = region.num_ranks();
131
132        // Allocate uninitialized buffer for T.
133        // Note: Vec<MaybeUninit<T>>'s Drop will only free the
134        // allocation; it never runs T's destructor. We must
135        // explicitly drop any initialized elements (DropGuard) or
136        // convert into Vec<T>.
137        let mut buf: Vec<MaybeUninit<T>> = Vec::with_capacity(n);
138        // SAFETY: set `len = n` to treat the buffer as n uninit slots
139        // of `MaybeUninit<T>`. We never read before `ptr::write`,
140        // drop only slots marked initialized (bitset), and convert to
141        // `Vec<T>` only once all 0..n are initialized (guard enforces
142        // this).
143        unsafe {
144            buf.set_len(n);
145        }
146
147        // Compact bitset for occupancy.
148        let words = n.div_ceil(64);
149        let mut bits = vec![0u64; words];
150        let mut filled = 0usize;
151
152        // Drop guard: cleans up initialized elements on early exit.
153        // Stores raw, non-borrowed pointers (`NonNull`), so we don’t
154        // hold Rust references for the whole scope. This allows
155        // mutating `buf`/`bits` inside the loop while still letting
156        // the guard access them if dropped early.
157        struct DropGuard<T> {
158            buf: NonNull<MaybeUninit<T>>,
159            bits: NonNull<u64>,
160            n_elems: usize,
161            n_words: usize,
162            disarm: bool,
163        }
164
165        impl<T> DropGuard<T> {
166            /// # Safety
167            /// - `buf` points to `buf.len()` contiguous
168            ///   `MaybeUninit<T>` and outlives the guard.
169            /// - `bits` points to `bits.len()` contiguous `u64` words
170            ///   and outlives the guard.
171            /// - For every set bit `i` in `bits`, `buf[i]` has been
172            ///   initialized with a valid `T` (and on duplicates, the
173            ///   previous `T` at `i` was dropped before overwrite).
174            /// - While the guard is alive, caller may mutate
175            ///   `buf`/`bits` (the guard holds only raw pointers; it
176            ///   does not create Rust borrows).
177            /// - On drop (when not disarmed), the guard will read
178            ///   `bits`, mask tail bits in the final word, and
179            ///   `drop_in_place` each initialized `buf[i]`—never
180            ///   accessing beyond `buf.len()` / `bits.len()`.
181            /// - If a slice is empty, the stored pointer may be
182            ///   `dangling()`; it is never dereferenced when the
183            ///   corresponding length is zero.
184            unsafe fn new(buf: &mut [MaybeUninit<T>], bits: &mut [u64]) -> Self {
185                let n_elems = buf.len();
186                let n_words = bits.len();
187                // Invariant typically: n_words == (n_elems + 63) / 64
188                // but we don't *require* it; tail is masked in Drop.
189                Self {
190                    buf: NonNull::new(buf.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
191                    bits: NonNull::new(bits.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
192                    n_elems,
193                    n_words,
194                    disarm: false,
195                }
196            }
197
198            #[inline]
199            fn disarm(&mut self) {
200                self.disarm = true;
201            }
202        }
203
204        impl<T> Drop for DropGuard<T> {
205            fn drop(&mut self) {
206                if self.disarm {
207                    return;
208                }
209
210                // SAFETY:
211                // - `self.buf` points to `n_elems` contiguous
212                //   `MaybeUninit<T>` slots (or may be dangling if
213                //   `n_elems == 0`); `self.bits` points to `n_words`
214                //   contiguous `u64` words (or may be dangling if
215                //   `n_words == 0`).
216                // - Loop bounds ensure `w < n_words` when reading
217                //   `*bits_base.add(w)`.
218                // - For the final word we mask unused tail bits so
219                //   any computed index `i = w * 64 + tz` always
220                //   satisfies `i < n_elems` before we dereference
221                //   `buf_base.add(i)`.
222                // - Only slots whose bits are set are dropped, so no
223                //   double-drops.
224                // - No aliasing with active Rust borrows: the guard
225                //   holds raw pointers and runs in `Drop` after the
226                //   fill loop.
227                unsafe {
228                    let buf_base = self.buf.as_ptr();
229                    let bits_base = self.bits.as_ptr();
230
231                    for w in 0..self.n_words {
232                        // Load word.
233                        let mut word = *bits_base.add(w);
234
235                        // Mask off bits beyond `n_elems` in the final
236                        // word (if any).
237                        if w == self.n_words.saturating_sub(1) {
238                            let used_bits = self.n_elems.saturating_sub(w * 64);
239                            if used_bits < 64 {
240                                let mask = if used_bits == 0 {
241                                    0
242                                } else {
243                                    (1u64 << used_bits) - 1
244                                };
245                                word &= mask;
246                            }
247                        }
248
249                        // Fast scan set bits.
250                        while word != 0 {
251                            let tz = word.trailing_zeros() as usize;
252                            let i = w * 64 + tz;
253                            debug_assert!(i < self.n_elems);
254
255                            let slot = buf_base.add(i);
256                            // Drop the initialized element.
257                            ptr::drop_in_place((*slot).as_mut_ptr());
258
259                            // clear the bit we just handled
260                            word &= word - 1;
261                        }
262                    }
263                }
264            }
265        }
266
267        // SAFETY:
268        // - `buf` and `bits` are freshly allocated Vecs with
269        //   capacity/len set to cover exactly `n_elems` and `n_words`,
270        //   so their `.as_mut_ptr()` is valid for that many elements.
271        // - Both slices live at least as long as the guard, and are
272        //   not moved until after the guard is disarmed.
273        // - No aliasing occurs: the guard holds only raw pointers and
274        //   the fill loop mutates through those same allocations.
275        let mut guard = unsafe { DropGuard::new(&mut buf, &mut bits) };
276
277        for (rank, value) in pairs {
278            // Single bounds check up front.
279            if rank >= guard.n_elems {
280                return Err(crate::v1::Error::InvalidRankCardinality {
281                    expected: guard.n_elems,
282                    actual: rank + 1,
283                });
284            }
285
286            // Compute word index and bit mask once.
287            let w = rank / 64;
288            let b = rank % 64;
289            let mask = 1u64 << b;
290
291            // SAFETY:
292            // - `rank < guard.n_elems` was checked above, so
293            //   `buf_slot = buf.add(rank)` is within the
294            //   `Vec<MaybeUninit<T>>` allocation.
295            // - `w = rank / 64` and `bits.len() == (n + 63) / 64`
296            //   ensure `bits_ptr = bits.add(w)` is in-bounds.
297            // - If `(word & mask) != 0`, then this slot was
298            //   previously initialized;
299            //   `drop_in_place((*buf_slot).as_mut_ptr())` is valid and
300            //   leaves the slot uninitialized.
301            // - If the bit was clear, we set it and count `filled +=
302            //   1`.
303            // - `(*buf_slot).write(value)` is valid in both cases:
304            //   either writing into an uninitialized slot or
305            //   immediately after dropping the prior `T`.
306            // - No aliasing with Rust references: the guard holds raw
307            //   pointers and we have exclusive ownership of
308            //   `buf`/`bits` within this function.
309            unsafe {
310                // Pointers from the guard (no long-lived & borrows).
311                let bits_ptr = guard.bits.as_ptr().add(w);
312                let buf_slot = guard.buf.as_ptr().add(rank);
313
314                // Read the current word.
315                let word = *bits_ptr;
316
317                if (word & mask) != 0 {
318                    // Duplicate: drop old value before overwriting.
319                    core::ptr::drop_in_place((*buf_slot).as_mut_ptr());
320                    // (Bit already set; no need to set again; don't
321                    // bump `filled`.)
322                } else {
323                    // First time we see this rank.
324                    *bits_ptr = word | mask;
325                    filled += 1;
326                }
327
328                // Write new value into the slot.
329                (*buf_slot).write(value);
330            }
331        }
332
333        if filled != n {
334            // Missing ranks: actual = number of distinct ranks seen.
335            return Err(crate::v1::Error::InvalidRankCardinality {
336                expected: n,
337                actual: filled,
338            });
339        }
340
341        // Success: prevent cleanup.
342        guard.disarm();
343
344        // SAFETY: all n slots are initialized
345        let ranks = unsafe {
346            let ptr = buf.as_mut_ptr() as *mut T;
347            let len = buf.len();
348            let cap = buf.capacity();
349            // Prevent `buf` (Vec<MaybeUninit<T>>) from freeing the
350            // allocation. Ownership of the buffer is about to be
351            // transferred to `Vec<T>` via `from_raw_parts`.
352            // Forgetting avoids a double free.
353            mem::forget(buf);
354            Vec::from_raw_parts(ptr, len, cap)
355        };
356
357        Ok(Self::new_unchecked(region, ranks))
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use std::convert::Infallible;
364    use std::future::Future;
365    use std::pin::Pin;
366    use std::task::Context;
367    use std::task::Poll;
368    use std::task::RawWaker;
369    use std::task::RawWakerVTable;
370    use std::task::Waker;
371
372    use futures::executor::block_on;
373    use futures::future;
374    use ndslice::extent;
375    use ndslice::strategy::gen_region;
376    use ndslice::view::CollectExactMeshExt;
377    use ndslice::view::CollectIndexedMeshExt;
378    use ndslice::view::CollectMeshExt;
379    use ndslice::view::MapIntoExt;
380    use ndslice::view::Ranked;
381    use ndslice::view::ViewExt;
382    use proptest::prelude::*;
383    use proptest::strategy::ValueTree;
384
385    use super::*;
386
387    #[test]
388    fn value_mesh_new_ok() {
389        let region: Region = extent!(replica = 2, gpu = 3).into();
390        let mesh = ValueMesh::new(region.clone(), (0..6).collect()).expect("new should succeed");
391        assert_eq!(mesh.region().num_ranks(), 6);
392        assert_eq!(mesh.values().count(), 6);
393        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
394    }
395
396    #[test]
397    fn value_mesh_new_len_mismatch_is_error() {
398        let region: Region = extent!(replica = 2, gpu = 3).into();
399        let err = ValueMesh::new(region, vec![0_i32; 5]).unwrap_err();
400        match err {
401            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
402                assert_eq!(expected, 6);
403                assert_eq!(actual, 5);
404            }
405            other => panic!("unexpected error: {other:?}"),
406        }
407    }
408
409    #[test]
410    fn value_mesh_transpose_ok_and_err() {
411        let region: Region = extent!(x = 2).into();
412
413        // ok case
414        let ok_mesh = ValueMesh::new(region.clone(), vec![Ok::<_, Infallible>(1), Ok(2)]).unwrap();
415        let ok = ok_mesh.transpose().unwrap();
416        assert_eq!(ok.values().collect::<Vec<_>>(), vec![1, 2]);
417
418        // err case: propagate user E
419        #[derive(Debug, PartialEq)]
420        enum E {
421            Boom,
422        }
423        let err_mesh = ValueMesh::new(region, vec![Ok(1), Err(E::Boom)]).unwrap();
424        let err = err_mesh.transpose().unwrap_err();
425        assert_eq!(err, E::Boom);
426    }
427
428    #[test]
429    fn value_mesh_join_preserves_region_and_values() {
430        let region: Region = extent!(x = 2, y = 2).into();
431        let futs = vec![
432            future::ready(10),
433            future::ready(11),
434            future::ready(12),
435            future::ready(13),
436        ];
437        let mesh = ValueMesh::new(region.clone(), futs).unwrap();
438
439        let joined = block_on(mesh.join());
440        assert_eq!(joined.region().num_ranks(), 4);
441        assert_eq!(joined.values().collect::<Vec<_>>(), vec![10, 11, 12, 13]);
442    }
443
444    #[test]
445    fn collect_mesh_ok() {
446        let region: Region = extent!(x = 2, y = 3).into();
447        let mesh = (0..6)
448            .collect_mesh::<ValueMesh<_>>(region.clone())
449            .expect("collect_mesh should succeed");
450
451        assert_eq!(mesh.region().num_ranks(), 6);
452        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
453    }
454
455    #[test]
456    fn collect_mesh_len_too_short_is_error() {
457        let region: Region = extent!(x = 2, y = 3).into();
458        let err = (0..5).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
459
460        match err {
461            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
462                assert_eq!(expected, 6);
463                assert_eq!(actual, 5);
464            }
465            other => panic!("unexpected error: {other:?}"),
466        }
467    }
468
469    #[test]
470    fn collect_mesh_len_too_long_is_error() {
471        let region: Region = extent!(x = 2, y = 3).into();
472        let err = (0..7).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
473        match err {
474            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
475                assert_eq!(expected, 6);
476                assert_eq!(actual, 7);
477            }
478            other => panic!("unexpected error: {other:?}"),
479        }
480    }
481
482    #[test]
483    fn collect_mesh_from_map_pipeline() {
484        let region: Region = extent!(x = 2, y = 2).into();
485        let mesh = (0..4)
486            .map(|i| i * 10)
487            .collect_mesh::<ValueMesh<_>>(region.clone())
488            .unwrap();
489
490        assert_eq!(mesh.region().num_ranks(), 4);
491        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
492    }
493
494    #[test]
495    fn collect_exact_mesh_ok() {
496        let region: Region = extent!(x = 2, y = 3).into();
497        let mesh = (0..6)
498            .collect_exact_mesh::<ValueMesh<_>>(region.clone())
499            .expect("collect_exact_mesh should succeed");
500
501        assert_eq!(mesh.region().num_ranks(), 6);
502        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
503    }
504
505    #[test]
506    fn collect_exact_mesh_len_too_short_is_error() {
507        let region: Region = extent!(x = 2, y = 3).into();
508        let err = (0..5)
509            .collect_exact_mesh::<ValueMesh<_>>(region)
510            .unwrap_err();
511
512        match err {
513            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
514                assert_eq!(expected, 6);
515                assert_eq!(actual, 5);
516            }
517            other => panic!("unexpected error: {other:?}"),
518        }
519    }
520
521    #[test]
522    fn collect_exact_mesh_len_too_long_is_error() {
523        let region: Region = extent!(x = 2, y = 3).into();
524        let err = (0..7)
525            .collect_exact_mesh::<ValueMesh<_>>(region)
526            .unwrap_err();
527
528        match err {
529            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
530                assert_eq!(expected, 6);
531                assert_eq!(actual, 7);
532            }
533            other => panic!("unexpected error: {other:?}"),
534        }
535    }
536
537    #[test]
538    fn collect_exact_mesh_from_map_pipeline() {
539        let region: Region = extent!(x = 2, y = 2).into();
540        let mesh = (0..4)
541            .map(|i| i * 10)
542            .collect_exact_mesh::<ValueMesh<_>>(region.clone())
543            .unwrap();
544
545        assert_eq!(mesh.region().num_ranks(), 4);
546        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
547    }
548
549    #[test]
550    fn collect_indexed_ok_shuffled() {
551        let region: Region = extent!(x = 2, y = 3).into();
552        // (rank, value) in shuffled order; values = rank * 10
553        let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)];
554        let mesh = pairs
555            .into_iter()
556            .collect_indexed::<ValueMesh<_>>(region.clone())
557            .unwrap();
558
559        assert_eq!(mesh.region().num_ranks(), 6);
560        assert_eq!(
561            mesh.values().collect::<Vec<_>>(),
562            vec![0, 10, 20, 30, 40, 50]
563        );
564    }
565
566    #[test]
567    fn collect_indexed_missing_rank_is_error() {
568        let region: Region = extent!(x = 2, y = 2).into(); // 4
569        // Missing rank 3
570        let pairs = vec![(0, 100), (1, 101), (2, 102)];
571        let err = pairs
572            .into_iter()
573            .collect_indexed::<ValueMesh<_>>(region)
574            .unwrap_err();
575
576        match err {
577            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
578                assert_eq!(expected, 4);
579                assert_eq!(actual, 3); // Distinct ranks seen.
580            }
581            other => panic!("unexpected error: {other:?}"),
582        }
583    }
584
585    #[test]
586    fn collect_indexed_out_of_bounds_is_error() {
587        let region: Region = extent!(x = 2, y = 2).into(); // 4 (valid ranks 0..=3)
588        let pairs = vec![(0, 1), (4, 9)]; // 4 is out-of-bounds
589        let err = pairs
590            .into_iter()
591            .collect_indexed::<ValueMesh<_>>(region)
592            .unwrap_err();
593
594        match err {
595            crate::v1::Error::InvalidRankCardinality { expected, actual } => {
596                assert_eq!(expected, 4);
597                assert_eq!(actual, 5); // offending index + 1
598            }
599            other => panic!("unexpected error: {other:?}"),
600        }
601    }
602
603    #[test]
604    fn collect_indexed_duplicate_last_write_wins() {
605        let region: Region = extent!(x = 1, y = 3).into(); // 3
606        // rank 1 appears twice; last value should stick
607        let pairs = vec![(0, 7), (1, 8), (1, 88), (2, 9)];
608        let mesh = pairs
609            .into_iter()
610            .collect_indexed::<ValueMesh<_>>(region.clone())
611            .unwrap();
612
613        assert_eq!(mesh.values().collect::<Vec<_>>(), vec![7, 88, 9]);
614    }
615
616    // Indexed collector naïve implementation (for reference).
617    fn build_value_mesh_indexed<T>(
618        region: Region,
619        pairs: impl IntoIterator<Item = (usize, T)>,
620    ) -> crate::v1::Result<ValueMesh<T>> {
621        let n = region.num_ranks();
622
623        // Buffer for exactly n slots; fill by rank.
624        let mut buf: Vec<Option<T>> = std::iter::repeat_with(|| None).take(n).collect();
625        let mut filled = 0usize;
626
627        for (rank, value) in pairs {
628            if rank >= n {
629                // Out-of-bounds: report `expected` = n, `actual` =
630                // offending index + 1; i.e. number of ranks implied
631                // so far.
632                return Err(crate::v1::Error::InvalidRankCardinality {
633                    expected: n,
634                    actual: rank + 1,
635                });
636            }
637            if buf[rank].is_none() {
638                filled += 1;
639            }
640            buf[rank] = Some(value); // Last write wins.
641        }
642
643        if filled != n {
644            // Missing ranks: actual = number of distinct ranks seen.
645            return Err(crate::v1::Error::InvalidRankCardinality {
646                expected: n,
647                actual: filled,
648            });
649        }
650
651        // All present and in-bounds: unwrap and build unchecked.
652        let ranks: Vec<T> = buf.into_iter().map(Option::unwrap).collect();
653        Ok(ValueMesh::new_unchecked(region, ranks))
654    }
655
656    /// This uses the bit-mixing portion of Sebastiano Vigna's
657    /// [SplitMix64 algorithm](https://prng.di.unimi.it/splitmix64.c)
658    /// to generate a high-quality 64-bit hash from a usize index.
659    /// Unlike the full SplitMix64 generator, this is stateless - we
660    /// accept an arbitrary x as input and apply the mix function to
661    /// turn `x` deterministically into a "randomized" u64. input
662    /// always produces the same output.
663    fn hash_key(x: usize) -> u64 {
664        let mut z = x as u64 ^ 0x9E3779B97F4A7C15;
665        z = (z ^ (z >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
666        z = (z ^ (z >> 27)).wrapping_mul(0x94D049BB133111EB);
667        z ^ (z >> 31)
668    }
669
670    /// Shuffle a slice deterministically, using a hash of indices as
671    /// the key.
672    ///
673    /// Each position `i` is assigned a pseudo-random 64-bit key (from
674    /// `key(i)`), the slice is sorted by those keys, and the
675    /// resulting permutation is applied in place.
676    ///
677    /// The permutation is fully determined by the sequence of indices
678    /// `0..n` and the chosen `key` function. Running it twice on the
679    /// same input yields the same "random-looking" arrangement.
680    ///
681    /// This is going to be used (below) for property tests: it gives
682    /// the effect of a shuffle without introducing global RNG state,
683    /// and ensures that duplicate elements are still ordered
684    /// consistently (so we can test "last write wins" semantics in
685    /// collectors).
686    fn pseudo_shuffle<'a, T: 'a>(v: &'a mut [T], key: impl Fn(usize) -> u64 + Copy) {
687        // Build perm.
688        let mut with_keys: Vec<(u64, usize)> = (0..v.len()).map(|i| (key(i), i)).collect();
689        with_keys.sort_by_key(|&(k, _)| k);
690        let perm: Vec<usize> = with_keys.into_iter().map(|(_, i)| i).collect();
691
692        // In-place permutation using a cycle based approach (e.g.
693        // https://www.geeksforgeeks.org/dsa/permute-the-elements-of-an-array-following-given-order/).
694        let mut seen = vec![false; v.len()];
695        for i in 0..v.len() {
696            if seen[i] {
697                continue;
698            }
699            let mut a = i;
700            while !seen[a] {
701                seen[a] = true;
702                let b = perm[a];
703                // Short circuit on the cycle's start index.
704                if b == i {
705                    break;
706                }
707                v.swap(a, b);
708                a = b;
709            }
710        }
711    }
712
713    // Property: Optimized and reference collectors yield the same
714    // `ValueMesh` on complete inputs, even with duplicates.
715    //
716    // - Begin with a complete set of `(rank, value)` pairs covering
717    //   all ranks of the region.
718    // - Add extra pairs at arbitrary ranks (up to `extra_len`), which
719    //   necessarily duplicate existing entries when `extra_len > 0`.
720    // - Shuffle the combined pairs deterministically.
721    // - Collect using both the reference (`try_collect_indexed`) and
722    //   optimized (`try_collect_indexed_opt`) implementations.
723    //
724    // Both collectors must succeed and produce identical results.
725    // This demonstrates that the optimized version preserves
726    // last-write-wins semantics and agrees exactly with the reference
727    // behavior.
728    proptest! {
729        #[test]
730        fn try_collect_opt_equivalence(region in gen_region(1..=4, 6), extra_len in 0usize..=12) {
731            let n = region.num_ranks();
732
733            // Start with one pair per rank (coverage guaranteed).
734            let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
735
736            // Add some extra duplicates of random in-bounds ranks.
737            // Their values differ so last-write-wins is observable.
738            let extras = proptest::collection::vec(0..n, extra_len)
739                .new_tree(&mut proptest::test_runner::TestRunner::default())
740                .unwrap()
741                .current();
742            for (k, r) in extras.into_iter().enumerate() {
743                pairs.push((r, (n as i64) + (k as i64)));
744            }
745
746            // Deterministic "shuffle" to fix iteration order across
747            // both collectors.
748            pseudo_shuffle(&mut pairs, hash_key);
749
750            // Reference vs optimized.
751            let mesh_ref = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap();
752            let mesh_opt = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region.clone()).unwrap();
753
754            prop_assert_eq!(mesh_ref.region(), mesh_opt.region());
755            prop_assert_eq!(mesh_ref.values().collect::<Vec<_>>(), mesh_opt.values().collect::<Vec<_>>());
756        }
757    }
758
759    // Property: Optimized and reference collectors report identical
760    // errors when ranks are missing.
761    //
762    // - Begin with a complete set of `(rank, value)` pairs.
763    // - Remove one rank so coverage is incomplete.
764    // - Shuffle deterministically.
765    // - Collect with both implementations.
766    //
767    // Both must fail with `InvalidRankCardinality` describing the
768    // same expected vs. actual counts.
769    proptest! {
770        #[test]
771        fn try_collect_opt_missing_rank_errors_match(region in gen_region(1..=4, 6)) {
772            let n = region.num_ranks();
773            // Base complete.
774            let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
775            // Drop one distinct rank.
776            if n > 0 {
777                let drop_idx = 0usize; // Deterministic, fine for the property.
778                pairs.remove(drop_idx);
779            }
780            // Shuffle deterministically.
781            pseudo_shuffle(&mut pairs, hash_key);
782
783            let ref_err  = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
784            let opt_err  = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
785            assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
786        }
787    }
788
789    // Property: Optimized and reference collectors report identical
790    // errors when given out-of-bounds ranks.
791    //
792    // - Construct a set of `(rank, value)` pairs.
793    // - Include at least one pair whose rank is ≥
794    //   `region.num_ranks()`.
795    // - Shuffle deterministically.
796    // - Collect with both implementations.
797    //
798    // Both must fail with `InvalidRankCardinality`, and the reported
799    // error values must match exactly.
800    proptest! {
801        #[test]
802        fn try_collect_opt_out_of_bound_errors_match(region in gen_region(1..=4, 6)) {
803            let n = region.num_ranks();
804            // One valid, then one out-of-bound.
805            let mut pairs = vec![(0usize, 0i64), (n, 123i64)];
806            pseudo_shuffle(&mut pairs, hash_key);
807
808            let ref_err = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
809            let opt_err = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
810            assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
811        }
812    }
813
814    #[test]
815    fn map_into_preserves_region_and_order() {
816        let region: Region = extent!(rows = 2, cols = 3).into();
817        let vm = ValueMesh::new_unchecked(region.clone(), vec![0, 1, 2, 3, 4, 5]);
818
819        let doubled: ValueMesh<_> = vm.map_into(|x| x * 2);
820        assert_eq!(doubled.region, region);
821        assert_eq!(doubled.ranks, vec![0, 2, 4, 6, 8, 10]);
822    }
823
824    #[test]
825    fn map_into_ref_borrows_and_preserves() {
826        let region: Region = extent!(n = 4).into();
827        let vm = ValueMesh::new_unchecked(
828            region.clone(),
829            vec!["a".to_string(), "b".into(), "c".into(), "d".into()],
830        );
831
832        let lens: ValueMesh<_> = vm.map_into(|s| s.len());
833        assert_eq!(lens.region, region);
834        assert_eq!(lens.ranks, vec![1, 1, 1, 1]);
835    }
836
837    #[test]
838    fn try_map_into_short_circuits_on_error() {
839        let region = extent!(n = 4).into();
840        let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
841
842        let res: Result<ValueMesh<i32>, &'static str> =
843            vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
844
845        assert!(res.is_err());
846        assert_eq!(res.unwrap_err(), "boom");
847    }
848
849    #[test]
850    fn try_map_into_ref_short_circuits_on_error() {
851        let region = extent!(n = 4).into();
852        let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
853
854        let res: Result<ValueMesh<i32>, &'static str> =
855            vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
856
857        assert!(res.is_err());
858        assert_eq!(res.unwrap_err(), "boom");
859    }
860
861    // -- Helper to poll `core::future::Ready` without a runtime
862    fn noop_waker() -> Waker {
863        fn clone(_: *const ()) -> RawWaker {
864            RawWaker::new(std::ptr::null(), &VTABLE)
865        }
866        fn wake(_: *const ()) {}
867        fn wake_by_ref(_: *const ()) {}
868        fn drop(_: *const ()) {}
869        static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
870        // SAFETY: The raw waker never dereferences its data pointer
871        // (`null`), and all vtable fns are no-ops. It's only used to
872        // satisfy `Context` for polling already-ready futures in
873        // tests.
874        unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
875    }
876
877    fn poll_now<F: Future>(mut fut: F) -> F::Output {
878        let waker = noop_waker();
879        let mut cx = Context::from_waker(&waker);
880        // SAFETY: `fut` is a local stack variable that we never move
881        // after pinning, and we only use it to poll immediately
882        // within this scope. This satisfies the invariants of
883        // `Pin::new_unchecked`.
884        let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
885        match fut.as_mut().poll(&mut cx) {
886            Poll::Ready(v) => v,
887            Poll::Pending => unreachable!("Ready futures must complete immediately"),
888        }
889    }
890    // --
891
892    #[test]
893    fn map_into_ready_futures() {
894        let region: Region = extent!(r = 2, c = 2).into();
895        let vm = ValueMesh::new_unchecked(region.clone(), vec![10, 20, 30, 40]);
896
897        // Map to `core::future::Ready` futures.
898        let pending: ValueMesh<core::future::Ready<_>> =
899            vm.map_into(|x| core::future::ready(x + 1));
900        assert_eq!(pending.region, region);
901
902        // Drive the ready futures without a runtime and collect results.
903        let results: Vec<_> = pending.ranks.into_iter().map(poll_now).collect();
904        assert_eq!(results, vec![11, 21, 31, 41]);
905    }
906
907    #[test]
908    fn map_into_single_element_mesh() {
909        let region: Region = extent!(n = 1).into();
910        let vm = ValueMesh::new_unchecked(region.clone(), vec![7]);
911
912        let out: ValueMesh<_> = vm.map_into(|x| x * x);
913        assert_eq!(out.region, region);
914        assert_eq!(out.ranks, vec![49]);
915    }
916
917    #[test]
918    fn map_into_ref_with_non_clone_field() {
919        // A type that intentionally does NOT implement Clone.
920        #[derive(Debug, PartialEq, Eq)]
921        struct NotClone(i32);
922
923        let region: Region = extent!(x = 3).into();
924        let values = vec![(10, NotClone(1)), (20, NotClone(2)), (30, NotClone(3))];
925        let mesh: ValueMesh<(i32, NotClone)> =
926            values.into_iter().collect_mesh(region.clone()).unwrap();
927
928        let projected: ValueMesh<i32> = mesh.map_into(|t| t.0);
929        assert_eq!(projected.values().collect::<Vec<_>>(), vec![10, 20, 30]);
930        assert_eq!(projected.region(), &region);
931    }
932}