1use std::cmp::Ordering;
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::hash::Hash;
18use std::marker::PhantomData;
19use std::mem;
20use std::mem::MaybeUninit;
21use std::ops::Range;
22use std::ptr;
23use std::ptr::NonNull;
24
25use futures::Future;
26use hyperactor::accum::Accumulator;
27use hyperactor::accum::CommReducer;
28use hyperactor::accum::ReducerFactory;
29use hyperactor::accum::ReducerSpec;
30use ndslice::extent;
31use ndslice::view;
32use ndslice::view::Ranked;
33use ndslice::view::Region;
34use serde::Deserialize;
35use serde::Serialize;
36use typeuri::Named;
37
38mod rle;
39mod value_overlay;
40pub use value_overlay::BuildError;
41pub use value_overlay::ValueOverlay;
42
43#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct ValueMesh<T> {
52 region: Region,
57
58 rep: Rep<T>,
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
82struct Run {
83 start: u64,
85 end: u64,
87 id: u32,
89}
90
91impl Run {
92 fn new(start: usize, end: usize, id: u32) -> Self {
97 Self {
98 start: start as u64,
99 end: end as u64,
100 id,
101 }
102 }
103}
104
105impl TryFrom<Run> for (Range<usize>, u32) {
106 type Error = &'static str;
107
108 #[allow(clippy::result_large_err)]
117 fn try_from(r: Run) -> Result<Self, Self::Error> {
118 let start = usize::try_from(r.start).map_err(|_| "run.start too large")?;
119 let end = usize::try_from(r.end).map_err(|_| "run.end too large")?;
120 Ok((start..end, r.id))
121 }
122}
123
124#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] enum Rep<T> {
139 Dense {
146 values: Vec<T>,
149 },
150
151 Compressed {
163 table: Vec<T>,
165
166 runs: Vec<Run>,
169 },
170}
171
172impl<T> Default for ValueMesh<T> {
177 fn default() -> Self {
178 Self::empty()
179 }
180}
181
182impl<T> ValueMesh<T> {
183 pub fn empty() -> Self {
190 let region = extent!(r = 0).into();
192 Self::new_unchecked(region, Vec::<T>::new())
193 }
194
195 #[allow(clippy::result_large_err)]
209 pub(crate) fn new(region: Region, ranks: Vec<T>) -> crate::Result<Self> {
210 let (actual, expected) = (ranks.len(), region.num_ranks());
211 if actual != expected {
212 return Err(crate::Error::InvalidRankCardinality { expected, actual });
213 }
214 Ok(Self {
215 region,
216 rep: Rep::Dense { values: ranks },
217 })
218 }
219
220 #[inline]
223 pub(crate) fn new_unchecked(region: Region, ranks: Vec<T>) -> Self {
224 debug_assert_eq!(region.num_ranks(), ranks.len());
225 Self {
226 region,
227 rep: Rep::Dense { values: ranks },
228 }
229 }
230}
231
232impl<T: Clone> ValueMesh<T> {
233 pub fn from_single(region: Region, s: T) -> Self {
241 let n = region.num_ranks();
242 if n == 0 {
243 return Self {
244 region,
245 rep: Rep::Compressed {
246 table: Vec::new(),
247 runs: Vec::new(),
248 },
249 };
250 }
251
252 let table = vec![s];
253 let runs = vec![Run::new(0, n, 0)];
254 Self {
255 region,
256 rep: Rep::Compressed { table, runs },
257 }
258 }
259}
260
261impl<T: Clone + Default> ValueMesh<T> {
262 pub fn from_default(region: Region) -> Self {
268 ValueMesh::<T>::from_single(region, T::default())
269 }
270}
271
272impl<T: Eq + Hash> ValueMesh<T> {
273 #[allow(clippy::result_large_err)]
282 pub fn from_ranges_with_default(
283 region: Region,
284 default: T,
285 mut ranges: Vec<(Range<usize>, T)>,
286 ) -> crate::Result<Self> {
287 let n = region.num_ranks();
288
289 if n == 0 {
290 return Ok(Self {
291 region,
292 rep: Rep::Compressed {
293 table: Vec::new(),
294 runs: Vec::new(),
295 },
296 });
297 }
298
299 for (r, _) in &ranges {
301 if r.is_empty() {
302 return Err(crate::Error::InvalidRankCardinality {
303 expected: n,
304 actual: 0,
305 }); }
307 if r.end > n {
308 return Err(crate::Error::InvalidRankCardinality {
309 expected: n,
310 actual: r.end,
311 });
312 }
313 }
314 ranges.sort_by_key(|(r, _)| (r.start, r.end));
315
316 for w in ranges.windows(2) {
318 let (a, _) = &w[0];
319 let (b, _) = &w[1];
320 if a.end > b.start {
321 return Err(crate::Error::InvalidRankCardinality {
323 expected: n,
324 actual: b.start, });
326 }
327 }
328
329 let mut index: HashMap<T, u32> = HashMap::with_capacity(1 + ranges.len());
331 let mut next_id: u32 = 0;
332
333 let id_of = |v: T, index: &mut HashMap<T, u32>, next_id: &mut u32| -> u32 {
336 match index.entry(v) {
337 Entry::Occupied(o) => *o.get(),
338 Entry::Vacant(vac) => {
339 let id = *next_id;
340 *next_id += 1;
341 vac.insert(id);
342 id
343 }
344 }
345 };
346
347 let default_id = id_of(default, &mut index, &mut next_id);
348
349 let mut runs: Vec<Run> = Vec::with_capacity(1 + 2 * ranges.len());
350 let mut cursor = 0usize;
351
352 for (r, v) in ranges.into_iter() {
353 if cursor < r.start {
355 runs.push(Run::new(cursor, r.start, default_id));
356 }
357 let id = id_of(v, &mut index, &mut next_id);
359 runs.push(Run::new(r.start, r.end, id));
360 cursor = r.end;
361 }
362
363 if cursor < n {
365 runs.push(Run::new(cursor, n, default_id));
366 }
367
368 let mut table_slots: Vec<Option<T>> = Vec::new();
371 table_slots.resize_with(next_id as usize, || None);
372
373 for (t, id) in index.into_iter() {
374 table_slots[id as usize] = Some(t);
375 }
376
377 let table: Vec<T> = table_slots
378 .into_iter()
379 .map(|o| o.expect("every id must be assigned"))
380 .collect();
381
382 Ok(Self {
383 region,
384 rep: Rep::Compressed { table, runs },
385 })
386 }
387
388 #[allow(clippy::result_large_err)]
418 pub fn from_dense(region: Region, values: Vec<T>) -> crate::Result<Self> {
419 let mut vm = Self::new(region, values)?;
420 vm.compress_adjacent_in_place();
421 Ok(vm)
422 }
423}
424
425impl<F: Future> ValueMesh<F> {
426 pub async fn join(self) -> ValueMesh<F::Output> {
429 let ValueMesh { region, rep } = self;
430
431 match rep {
432 Rep::Dense { values } => {
433 let results = futures::future::join_all(values).await;
434 ValueMesh::new_unchecked(region, results)
435 }
436 Rep::Compressed { .. } => {
437 unreachable!("join() not implemented for compressed meshes")
438 }
439 }
440 }
441}
442
443impl<T, E> ValueMesh<Result<T, E>> {
444 pub fn transpose(self) -> Result<ValueMesh<T>, E> {
447 let ValueMesh { region, rep } = self;
448
449 match rep {
450 Rep::Dense { values } => {
451 let values = values.into_iter().collect::<Result<Vec<T>, E>>()?;
452 Ok(ValueMesh::new_unchecked(region, values))
453 }
454 Rep::Compressed { table, runs } => {
455 let table: Vec<T> = table.into_iter().collect::<Result<Vec<T>, E>>()?;
456 Ok(ValueMesh {
457 region,
458 rep: Rep::Compressed { table, runs },
459 })
460 }
461 }
462 }
463}
464
465impl<T: 'static> view::Ranked for ValueMesh<T> {
466 type Item = T;
467
468 fn region(&self) -> &Region {
471 &self.region
472 }
473
474 fn get(&self, rank: usize) -> Option<&Self::Item> {
486 if rank >= self.region.num_ranks() {
487 return None;
488 }
489
490 match &self.rep {
491 Rep::Dense { values } => values.get(rank),
492
493 Rep::Compressed { table, runs } => {
494 let rank = rank as u64;
495
496 let idx = runs
499 .binary_search_by(|run| {
500 if run.end <= rank {
501 Ordering::Less
502 } else if run.start > rank {
503 Ordering::Greater
504 } else {
505 Ordering::Equal
506 }
507 })
508 .ok()?;
509
510 let id = runs[idx].id as usize;
512 table.get(id)
513 }
514 }
515 }
516}
517
518impl<T: Clone + 'static> view::RankedSliceable for ValueMesh<T> {
519 fn sliced(&self, region: Region) -> Self {
520 debug_assert!(region.is_subset(self.region()), "sliced: not a subset");
521 let ranks: Vec<T> = self
522 .region()
523 .remap(®ion)
524 .unwrap()
525 .map(|index| self.get(index).unwrap().clone())
526 .collect();
527 debug_assert_eq!(
528 region.num_ranks(),
529 ranks.len(),
530 "sliced: cardinality mismatch"
531 );
532 Self::new_unchecked(region, ranks)
533 }
534}
535
536impl<T> view::BuildFromRegion<T> for ValueMesh<T> {
537 type Error = crate::Error;
538
539 fn build_dense(region: Region, values: Vec<T>) -> Result<Self, Self::Error> {
540 Self::new(region, values)
541 }
542
543 fn build_dense_unchecked(region: Region, values: Vec<T>) -> Self {
544 Self::new_unchecked(region, values)
545 }
546}
547
548impl<T> view::BuildFromRegionIndexed<T> for ValueMesh<T> {
549 type Error = crate::Error;
550
551 fn build_indexed(
552 region: Region,
553 pairs: impl IntoIterator<Item = (usize, T)>,
554 ) -> Result<Self, Self::Error> {
555 let n = region.num_ranks();
556
557 let mut buf: Vec<MaybeUninit<T>> = Vec::with_capacity(n);
563 unsafe {
569 buf.set_len(n);
570 }
571
572 let words = n.div_ceil(64);
574 let mut bits = vec![0u64; words];
575 let mut filled = 0usize;
576
577 struct DropGuard<T> {
583 buf: NonNull<MaybeUninit<T>>,
584 bits: NonNull<u64>,
585 n_elems: usize,
586 n_words: usize,
587 disarm: bool,
588 }
589
590 impl<T> DropGuard<T> {
591 unsafe fn new(buf: &mut [MaybeUninit<T>], bits: &mut [u64]) -> Self {
610 let n_elems = buf.len();
611 let n_words = bits.len();
612 Self {
615 buf: NonNull::new(buf.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
616 bits: NonNull::new(bits.as_mut_ptr()).unwrap_or_else(NonNull::dangling),
617 n_elems,
618 n_words,
619 disarm: false,
620 }
621 }
622
623 #[inline]
624 fn disarm(&mut self) {
625 self.disarm = true;
626 }
627 }
628
629 impl<T> Drop for DropGuard<T> {
630 fn drop(&mut self) {
631 if self.disarm {
632 return;
633 }
634
635 unsafe {
653 let buf_base = self.buf.as_ptr();
654 let bits_base = self.bits.as_ptr();
655
656 for w in 0..self.n_words {
657 let mut word = *bits_base.add(w);
659
660 if w == self.n_words.saturating_sub(1) {
663 let used_bits = self.n_elems.saturating_sub(w * 64);
664 if used_bits < 64 {
665 let mask = if used_bits == 0 {
666 0
667 } else {
668 (1u64 << used_bits) - 1
669 };
670 word &= mask;
671 }
672 }
673
674 while word != 0 {
676 let tz = word.trailing_zeros() as usize;
677 let i = w * 64 + tz;
678 debug_assert!(i < self.n_elems);
679
680 let slot = buf_base.add(i);
681 ptr::drop_in_place((*slot).as_mut_ptr());
683
684 word &= word - 1;
686 }
687 }
688 }
689 }
690 }
691
692 let mut guard = unsafe { DropGuard::new(&mut buf, &mut bits) };
701
702 for (rank, value) in pairs {
703 if rank >= guard.n_elems {
705 return Err(crate::Error::InvalidRankCardinality {
706 expected: guard.n_elems,
707 actual: rank + 1,
708 });
709 }
710
711 let w = rank / 64;
713 let b = rank % 64;
714 let mask = 1u64 << b;
715
716 unsafe {
735 let bits_ptr = guard.bits.as_ptr().add(w);
737 let buf_slot = guard.buf.as_ptr().add(rank);
738
739 let word = *bits_ptr;
741
742 if (word & mask) != 0 {
743 core::ptr::drop_in_place((*buf_slot).as_mut_ptr());
745 } else {
748 *bits_ptr = word | mask;
750 filled += 1;
751 }
752
753 (*buf_slot).write(value);
755 }
756 }
757
758 if filled != n {
759 return Err(crate::Error::InvalidRankCardinality {
761 expected: n,
762 actual: filled,
763 });
764 }
765
766 guard.disarm();
768
769 let ranks = unsafe {
771 let ptr = buf.as_mut_ptr() as *mut T;
772 let len = buf.len();
773 let cap = buf.capacity();
774 mem::forget(buf);
779 Vec::from_raw_parts(ptr, len, cap)
780 };
781
782 Ok(Self::new_unchecked(region, ranks))
783 }
784}
785
786impl<T: PartialEq> ValueMesh<T> {
787 pub fn compress_adjacent_in_place(&mut self) {
807 self.compress_adjacent_in_place_by(|a, b| a == b)
808 }
809}
810
811impl<T: Clone + PartialEq> ValueMesh<T> {
812 fn materialized_runs(&self) -> Vec<(Range<usize>, T)> {
828 match &self.rep {
829 Rep::Dense { values } => {
830 let mut out = Vec::new();
832 if values.is_empty() {
833 return out;
834 }
835 let mut start = 0usize;
836 for i in 1..values.len() {
837 if values[i] != values[i - 1] {
838 out.push((start..i, values[i - 1].clone()));
839 start = i;
840 }
841 }
842 out.push((start..values.len(), values.last().unwrap().clone()));
843 out
844 }
845 Rep::Compressed { table, runs } => runs
846 .iter()
847 .map(|r| {
848 let id = r.id as usize;
849 ((r.start as usize..r.end as usize), table[id].clone())
850 })
851 .collect(),
852 }
853 }
854}
855
856impl<T: Clone + Eq> ValueMesh<T> {
857 pub fn merge_from_overlay(&mut self, overlay: ValueOverlay<T>) -> Result<(), BuildError> {
863 let n = self.region.num_ranks();
864
865 for (r, _) in overlay.runs() {
868 if r.end > n {
869 return Err(BuildError::OutOfBounds {
870 range: r.clone(),
871 region_len: n,
872 });
873 }
874 }
875
876 let left = self.materialized_runs();
878 let right: Vec<(std::ops::Range<usize>, T)> = overlay.runs().cloned().collect();
881
882 let merged = rle::merge_value_runs(left, right);
885
886 let (table, raw_runs) = rle::rle_from_value_runs(merged);
888 let runs = raw_runs
889 .into_iter()
890 .map(|(r, id)| Run::new(r.start, r.end, id))
891 .collect();
892 self.rep = Rep::Compressed { table, runs };
893
894 Ok(())
895 }
896}
897
898impl<T> ValueMesh<T> {
899 pub fn compress_adjacent_in_place_by<F>(&mut self, same: F)
921 where
922 F: FnMut(&T, &T) -> bool,
923 {
924 let values = match &mut self.rep {
925 Rep::Dense { values } => std::mem::take(values),
926 Rep::Compressed { .. } => return,
927 };
928 let (table, raw_runs) = rle::rle_from_dense(values, same);
929 let runs = raw_runs
930 .into_iter()
931 .map(|(r, id)| Run::new(r.start, r.end, id))
932 .collect();
933 self.rep = Rep::Compressed { table, runs };
934 }
935}
936
937impl<T> Accumulator for ValueMesh<T>
956where
957 T: Eq + Clone + Named,
958{
959 type State = Self;
960 type Update = ValueOverlay<T>;
961
962 fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
963 if state.region().num_ranks() == 0 {
968 *state = self.clone();
969 }
970
971 state.merge_from_overlay(update)?;
974 Ok(())
975 }
976
977 fn reducer_spec(&self) -> Option<ReducerSpec> {
978 Some(ReducerSpec {
979 typehash: <ValueOverlayReducer<T> as Named>::typehash(),
980 builder_params: None,
981 })
982 }
983}
984
985#[derive(Named)]
992struct ValueOverlayReducer<T>(std::marker::PhantomData<T>);
993
994impl<T> CommReducer for ValueOverlayReducer<T>
1004where
1005 T: Eq + Clone + Named,
1006{
1007 type Update = ValueOverlay<T>;
1008
1009 fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
1011 let merged = crate::value_mesh::rle::merge_value_runs(
1013 left.runs().cloned().collect(),
1014 right.runs().cloned().collect(),
1015 );
1016 Ok(ValueOverlay::try_from_runs(merged)?)
1018 }
1019}
1020
1021hyperactor::internal_macro_support::inventory::submit! {
1024 ReducerFactory {
1025 typehash_f: <ValueOverlayReducer<crate::resource::Status> as Named>::typehash,
1026 builder_f: |_| Ok(Box::new(ValueOverlayReducer::<crate::resource::Status>(PhantomData))),
1027 }
1028}
1029
1030#[cfg(test)]
1031mod tests {
1032 use std::convert::Infallible;
1033 use std::future::Future;
1034 use std::pin::Pin;
1035 use std::task::Context;
1036 use std::task::Poll;
1037 use std::task::RawWaker;
1038 use std::task::RawWakerVTable;
1039 use std::task::Waker;
1040
1041 use futures::executor::block_on;
1042 use futures::future;
1043 use ndslice::extent;
1044 use ndslice::strategy::gen_region;
1045 use ndslice::view::CollectExactMeshExt;
1046 use ndslice::view::CollectIndexedMeshExt;
1047 use ndslice::view::CollectMeshExt;
1048 use ndslice::view::MapIntoExt;
1049 use ndslice::view::Ranked;
1050 use ndslice::view::RankedSliceable;
1051 use ndslice::view::ViewExt;
1052 use proptest::prelude::*;
1053 use proptest::strategy::ValueTree;
1054 use serde_json;
1055
1056 use super::*;
1057
1058 #[test]
1059 fn value_mesh_new_ok() {
1060 let region: Region = extent!(replica = 2, gpu = 3).into();
1061 let mesh = ValueMesh::new(region.clone(), (0..6).collect()).expect("new should succeed");
1062 assert_eq!(mesh.region().num_ranks(), 6);
1063 assert_eq!(mesh.values().count(), 6);
1064 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1065 }
1066
1067 #[test]
1068 fn value_mesh_new_len_mismatch_is_error() {
1069 let region: Region = extent!(replica = 2, gpu = 3).into();
1070 let err = ValueMesh::new(region, vec![0_i32; 5]).unwrap_err();
1071 match err {
1072 crate::Error::InvalidRankCardinality { expected, actual } => {
1073 assert_eq!(expected, 6);
1074 assert_eq!(actual, 5);
1075 }
1076 other => panic!("unexpected error: {other:?}"),
1077 }
1078 }
1079
1080 #[test]
1081 fn value_mesh_transpose_ok_and_err() {
1082 let region: Region = extent!(x = 2).into();
1083
1084 let ok_mesh = ValueMesh::new(region.clone(), vec![Ok::<_, Infallible>(1), Ok(2)]).unwrap();
1086 let ok = ok_mesh.transpose().unwrap();
1087 assert_eq!(ok.values().collect::<Vec<_>>(), vec![1, 2]);
1088
1089 #[derive(Debug, PartialEq)]
1091 enum E {
1092 Boom,
1093 }
1094 let err_mesh = ValueMesh::new(region, vec![Ok(1), Err(E::Boom)]).unwrap();
1095 let err = err_mesh.transpose().unwrap_err();
1096 assert_eq!(err, E::Boom);
1097 }
1098
1099 #[test]
1100 fn value_mesh_join_preserves_region_and_values() {
1101 let region: Region = extent!(x = 2, y = 2).into();
1102 let futs = vec![
1103 future::ready(10),
1104 future::ready(11),
1105 future::ready(12),
1106 future::ready(13),
1107 ];
1108 let mesh = ValueMesh::new(region.clone(), futs).unwrap();
1109
1110 let joined = block_on(mesh.join());
1111 assert_eq!(joined.region().num_ranks(), 4);
1112 assert_eq!(joined.values().collect::<Vec<_>>(), vec![10, 11, 12, 13]);
1113 }
1114
1115 #[test]
1116 fn collect_mesh_ok() {
1117 let region: Region = extent!(x = 2, y = 3).into();
1118 let mesh = (0..6)
1119 .collect_mesh::<ValueMesh<_>>(region.clone())
1120 .expect("collect_mesh should succeed");
1121
1122 assert_eq!(mesh.region().num_ranks(), 6);
1123 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1124 }
1125
1126 #[test]
1127 fn collect_mesh_len_too_short_is_error() {
1128 let region: Region = extent!(x = 2, y = 3).into();
1129 let err = (0..5).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
1130
1131 match err {
1132 crate::Error::InvalidRankCardinality { expected, actual } => {
1133 assert_eq!(expected, 6);
1134 assert_eq!(actual, 5);
1135 }
1136 other => panic!("unexpected error: {other:?}"),
1137 }
1138 }
1139
1140 #[test]
1141 fn collect_mesh_len_too_long_is_error() {
1142 let region: Region = extent!(x = 2, y = 3).into();
1143 let err = (0..7).collect_mesh::<ValueMesh<_>>(region).unwrap_err();
1144 match err {
1145 crate::Error::InvalidRankCardinality { expected, actual } => {
1146 assert_eq!(expected, 6);
1147 assert_eq!(actual, 7);
1148 }
1149 other => panic!("unexpected error: {other:?}"),
1150 }
1151 }
1152
1153 #[test]
1154 fn collect_mesh_from_map_pipeline() {
1155 let region: Region = extent!(x = 2, y = 2).into();
1156 let mesh = (0..4)
1157 .map(|i| i * 10)
1158 .collect_mesh::<ValueMesh<_>>(region.clone())
1159 .unwrap();
1160
1161 assert_eq!(mesh.region().num_ranks(), 4);
1162 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
1163 }
1164
1165 #[test]
1166 fn collect_exact_mesh_ok() {
1167 let region: Region = extent!(x = 2, y = 3).into();
1168 let mesh = (0..6)
1169 .collect_exact_mesh::<ValueMesh<_>>(region.clone())
1170 .expect("collect_exact_mesh should succeed");
1171
1172 assert_eq!(mesh.region().num_ranks(), 6);
1173 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4, 5]);
1174 }
1175
1176 #[test]
1177 fn collect_exact_mesh_len_too_short_is_error() {
1178 let region: Region = extent!(x = 2, y = 3).into();
1179 let err = (0..5)
1180 .collect_exact_mesh::<ValueMesh<_>>(region)
1181 .unwrap_err();
1182
1183 match err {
1184 crate::Error::InvalidRankCardinality { expected, actual } => {
1185 assert_eq!(expected, 6);
1186 assert_eq!(actual, 5);
1187 }
1188 other => panic!("unexpected error: {other:?}"),
1189 }
1190 }
1191
1192 #[test]
1193 fn collect_exact_mesh_len_too_long_is_error() {
1194 let region: Region = extent!(x = 2, y = 3).into();
1195 let err = (0..7)
1196 .collect_exact_mesh::<ValueMesh<_>>(region)
1197 .unwrap_err();
1198
1199 match err {
1200 crate::Error::InvalidRankCardinality { expected, actual } => {
1201 assert_eq!(expected, 6);
1202 assert_eq!(actual, 7);
1203 }
1204 other => panic!("unexpected error: {other:?}"),
1205 }
1206 }
1207
1208 #[test]
1209 fn collect_exact_mesh_from_map_pipeline() {
1210 let region: Region = extent!(x = 2, y = 2).into();
1211 let mesh = (0..4)
1212 .map(|i| i * 10)
1213 .collect_exact_mesh::<ValueMesh<_>>(region.clone())
1214 .unwrap();
1215
1216 assert_eq!(mesh.region().num_ranks(), 4);
1217 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![0, 10, 20, 30]);
1218 }
1219
1220 #[test]
1221 fn collect_indexed_ok_shuffled() {
1222 let region: Region = extent!(x = 2, y = 3).into();
1223 let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)];
1225 let mesh = pairs
1226 .into_iter()
1227 .collect_indexed::<ValueMesh<_>>(region.clone())
1228 .unwrap();
1229
1230 assert_eq!(mesh.region().num_ranks(), 6);
1231 assert_eq!(
1232 mesh.values().collect::<Vec<_>>(),
1233 vec![0, 10, 20, 30, 40, 50]
1234 );
1235 }
1236
1237 #[test]
1238 fn collect_indexed_missing_rank_is_error() {
1239 let region: Region = extent!(x = 2, y = 2).into(); let pairs = vec![(0, 100), (1, 101), (2, 102)];
1242 let err = pairs
1243 .into_iter()
1244 .collect_indexed::<ValueMesh<_>>(region)
1245 .unwrap_err();
1246
1247 match err {
1248 crate::Error::InvalidRankCardinality { expected, actual } => {
1249 assert_eq!(expected, 4);
1250 assert_eq!(actual, 3); }
1252 other => panic!("unexpected error: {other:?}"),
1253 }
1254 }
1255
1256 #[test]
1257 fn collect_indexed_out_of_bounds_is_error() {
1258 let region: Region = extent!(x = 2, y = 2).into(); let pairs = vec![(0, 1), (4, 9)]; let err = pairs
1261 .into_iter()
1262 .collect_indexed::<ValueMesh<_>>(region)
1263 .unwrap_err();
1264
1265 match err {
1266 crate::Error::InvalidRankCardinality { expected, actual } => {
1267 assert_eq!(expected, 4);
1268 assert_eq!(actual, 5); }
1270 other => panic!("unexpected error: {other:?}"),
1271 }
1272 }
1273
1274 #[test]
1275 fn collect_indexed_duplicate_last_write_wins() {
1276 let region: Region = extent!(x = 1, y = 3).into(); let pairs = vec![(0, 7), (1, 8), (1, 88), (2, 9)];
1279 let mesh = pairs
1280 .into_iter()
1281 .collect_indexed::<ValueMesh<_>>(region.clone())
1282 .unwrap();
1283
1284 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![7, 88, 9]);
1285 }
1286
1287 #[allow(clippy::result_large_err)]
1289 fn build_value_mesh_indexed<T>(
1290 region: Region,
1291 pairs: impl IntoIterator<Item = (usize, T)>,
1292 ) -> crate::Result<ValueMesh<T>> {
1293 let n = region.num_ranks();
1294
1295 let mut buf: Vec<Option<T>> = std::iter::repeat_with(|| None).take(n).collect();
1297 let mut filled = 0usize;
1298
1299 for (rank, value) in pairs {
1300 if rank >= n {
1301 return Err(crate::Error::InvalidRankCardinality {
1305 expected: n,
1306 actual: rank + 1,
1307 });
1308 }
1309 if buf[rank].is_none() {
1310 filled += 1;
1311 }
1312 buf[rank] = Some(value); }
1314
1315 if filled != n {
1316 return Err(crate::Error::InvalidRankCardinality {
1318 expected: n,
1319 actual: filled,
1320 });
1321 }
1322
1323 let ranks: Vec<T> = buf.into_iter().map(Option::unwrap).collect();
1325 Ok(ValueMesh::new_unchecked(region, ranks))
1326 }
1327
1328 fn hash_key(x: usize) -> u64 {
1336 let mut z = x as u64 ^ 0x9E3779B97F4A7C15;
1337 z = (z ^ (z >> 30)).wrapping_mul(0xBF58476D1CE4E5B9);
1338 z = (z ^ (z >> 27)).wrapping_mul(0x94D049BB133111EB);
1339 z ^ (z >> 31)
1340 }
1341
1342 fn pseudo_shuffle<'a, T: 'a>(v: &'a mut [T], key: impl Fn(usize) -> u64 + Copy) {
1359 let mut with_keys: Vec<(u64, usize)> = (0..v.len()).map(|i| (key(i), i)).collect();
1361 with_keys.sort_by_key(|&(k, _)| k);
1362 let perm: Vec<usize> = with_keys.into_iter().map(|(_, i)| i).collect();
1363
1364 let mut seen = vec![false; v.len()];
1367 for i in 0..v.len() {
1368 if seen[i] {
1369 continue;
1370 }
1371 let mut a = i;
1372 while !seen[a] {
1373 seen[a] = true;
1374 let b = perm[a];
1375 if b == i {
1377 break;
1378 }
1379 v.swap(a, b);
1380 a = b;
1381 }
1382 }
1383 }
1384
1385 proptest! {
1401 #[test]
1402 fn try_collect_opt_equivalence(region in gen_region(1..=4, 6), extra_len in 0usize..=12) {
1403 let n = region.num_ranks();
1404
1405 let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
1407
1408 let extras = proptest::collection::vec(0..n, extra_len)
1411 .new_tree(&mut proptest::test_runner::TestRunner::default())
1412 .unwrap()
1413 .current();
1414 for (k, r) in extras.into_iter().enumerate() {
1415 pairs.push((r, (n as i64) + (k as i64)));
1416 }
1417
1418 pseudo_shuffle(&mut pairs, hash_key);
1421
1422 let mesh_ref = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap();
1424 let mesh_opt = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region.clone()).unwrap();
1425
1426 prop_assert_eq!(mesh_ref.region(), mesh_opt.region());
1427 prop_assert_eq!(mesh_ref.values().collect::<Vec<_>>(), mesh_opt.values().collect::<Vec<_>>());
1428 }
1429 }
1430
1431 proptest! {
1442 #[test]
1443 fn try_collect_opt_missing_rank_errors_match(region in gen_region(1..=4, 6)) {
1444 let n = region.num_ranks();
1445 let mut pairs: Vec<(usize, i64)> = (0..n).map(|r| (r, r as i64)).collect();
1447 if n > 0 {
1449 let drop_idx = 0usize; pairs.remove(drop_idx);
1451 }
1452 pseudo_shuffle(&mut pairs, hash_key);
1454
1455 let ref_err = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
1456 let opt_err = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
1457 assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
1458 }
1459 }
1460
1461 proptest! {
1473 #[test]
1474 fn try_collect_opt_out_of_bound_errors_match(region in gen_region(1..=4, 6)) {
1475 let n = region.num_ranks();
1476 let mut pairs = vec![(0usize, 0i64), (n, 123i64)];
1478 pseudo_shuffle(&mut pairs, hash_key);
1479
1480 let ref_err = build_value_mesh_indexed(region.clone(), pairs.clone()).unwrap_err();
1481 let opt_err = pairs.into_iter().collect_indexed::<ValueMesh<_>>(region).unwrap_err();
1482 assert_eq!(format!("{ref_err:?}"), format!("{opt_err:?}"));
1483 }
1484 }
1485
1486 #[test]
1487 fn map_into_preserves_region_and_order() {
1488 let region: Region = extent!(rows = 2, cols = 3).into();
1489 let vm = ValueMesh::new_unchecked(region.clone(), vec![0, 1, 2, 3, 4, 5]);
1490
1491 let doubled: ValueMesh<_> = vm.map_into(|x| x * 2);
1492 assert_eq!(doubled.region, region);
1493 assert_eq!(
1494 doubled.values().collect::<Vec<_>>(),
1495 vec![0, 2, 4, 6, 8, 10]
1496 );
1497 }
1498
1499 #[test]
1500 fn map_into_ref_borrows_and_preserves() {
1501 let region: Region = extent!(n = 4).into();
1502 let vm = ValueMesh::new_unchecked(
1503 region.clone(),
1504 vec!["a".to_string(), "b".into(), "c".into(), "d".into()],
1505 );
1506
1507 let lens: ValueMesh<_> = vm.map_into(|s| s.len());
1508 assert_eq!(lens.region, region);
1509 assert_eq!(lens.values().collect::<Vec<_>>(), vec![1, 1, 1, 1]);
1510 }
1511
1512 #[test]
1513 fn try_map_into_short_circuits_on_error() {
1514 let region = extent!(n = 4).into();
1515 let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
1516
1517 let res: Result<ValueMesh<i32>, &'static str> =
1518 vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
1519
1520 assert!(res.is_err());
1521 assert_eq!(res.unwrap_err(), "boom");
1522 }
1523
1524 #[test]
1525 fn try_map_into_ref_short_circuits_on_error() {
1526 let region = extent!(n = 4).into();
1527 let vm = ValueMesh::new_unchecked(region, vec![1, 2, 3, 4]);
1528
1529 let res: Result<ValueMesh<i32>, &'static str> =
1530 vm.try_map_into(|x| if x == &3 { Err("boom") } else { Ok(x + 10) });
1531
1532 assert!(res.is_err());
1533 assert_eq!(res.unwrap_err(), "boom");
1534 }
1535
1536 fn noop_waker() -> Waker {
1538 fn clone(_: *const ()) -> RawWaker {
1539 RawWaker::new(std::ptr::null(), &VTABLE)
1540 }
1541 fn wake(_: *const ()) {}
1542 fn wake_by_ref(_: *const ()) {}
1543 fn drop(_: *const ()) {}
1544 static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
1545 unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
1550 }
1551
1552 fn poll_now<F: Future>(mut fut: F) -> F::Output {
1553 let waker = noop_waker();
1554 let mut cx = Context::from_waker(&waker);
1555 let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
1560 match fut.as_mut().poll(&mut cx) {
1561 Poll::Ready(v) => v,
1562 Poll::Pending => unreachable!("Ready futures must complete immediately"),
1563 }
1564 }
1565 #[test]
1568 fn map_into_ready_futures() {
1569 let region: Region = extent!(r = 2, c = 2).into();
1570 let vm = ValueMesh::new_unchecked(region.clone(), vec![10, 20, 30, 40]);
1571
1572 let pending: ValueMesh<core::future::Ready<_>> =
1574 vm.map_into(|x| core::future::ready(x + 1));
1575 assert_eq!(pending.region, region);
1576
1577 let results: Vec<_> = pending.values().map(|f| poll_now(f.clone())).collect();
1579 assert_eq!(results, vec![11, 21, 31, 41]);
1580 }
1581
1582 #[test]
1583 fn map_into_single_element_mesh() {
1584 let region: Region = extent!(n = 1).into();
1585 let vm = ValueMesh::new_unchecked(region.clone(), vec![7]);
1586
1587 let out: ValueMesh<_> = vm.map_into(|x| x * x);
1588 assert_eq!(out.region, region);
1589 assert_eq!(out.values().collect::<Vec<_>>(), vec![49]);
1590 }
1591
1592 #[test]
1593 fn map_into_ref_with_non_clone_field() {
1594 #[derive(Debug, PartialEq, Eq)]
1596 struct NotClone(i32);
1597
1598 let region: Region = extent!(x = 3).into();
1599 let values = vec![(10, NotClone(1)), (20, NotClone(2)), (30, NotClone(3))];
1600 let mesh: ValueMesh<(i32, NotClone)> =
1601 values.into_iter().collect_mesh(region.clone()).unwrap();
1602
1603 let projected: ValueMesh<i32> = mesh.map_into(|t| t.0);
1604 assert_eq!(projected.values().collect::<Vec<_>>(), vec![10, 20, 30]);
1605 assert_eq!(projected.region(), ®ion);
1606 }
1607
1608 #[test]
1609 fn rle_roundtrip_all_equal() {
1610 let region: Region = extent!(n = 6).into();
1611 let mut vm = ValueMesh::new_unchecked(region.clone(), vec![42; 6]);
1612
1613 vm.compress_adjacent_in_place();
1615 let collected: Vec<_> = vm.values().collect();
1616 assert_eq!(collected, vec![42, 42, 42, 42, 42, 42]);
1617
1618 for i in 0..region.num_ranks() {
1620 assert_eq!(vm.get(i), Some(&42));
1621 }
1622 assert_eq!(vm.get(region.num_ranks()), None); }
1624
1625 #[test]
1626 fn rle_roundtrip_alternating() {
1627 let region: Region = extent!(n = 6).into();
1628 let original = vec![1, 2, 1, 2, 1, 2];
1629 let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1630
1631 vm.compress_adjacent_in_place();
1632 let collected: Vec<_> = vm.values().collect();
1633 assert_eq!(collected, original);
1634
1635 assert_eq!(vm.get(0), Some(&1));
1637 assert_eq!(vm.get(1), Some(&2));
1638 assert_eq!(vm.get(3), Some(&2));
1639 assert_eq!(vm.get(5), Some(&2));
1640 }
1641
1642 #[test]
1643 fn rle_roundtrip_blocky_and_slice() {
1644 let region: Region = extent!(n = 10).into();
1646 let original = vec![0, 0, 0, 1, 1, 2, 2, 2, 2, 3];
1647 let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1648
1649 vm.compress_adjacent_in_place();
1650 let collected: Vec<_> = vm.values().collect();
1651 assert_eq!(collected, original);
1652
1653 let sub_region = region.range("n", 3..8).unwrap();
1655 let sliced = vm.sliced(sub_region);
1656 let sliced_vec: Vec<_> = sliced.values().collect();
1657 assert_eq!(sliced_vec, vec![1, 1, 2, 2, 2]);
1658 }
1659
1660 #[test]
1661 fn rle_idempotent_noop_on_second_call() {
1662 let region: Region = extent!(n = 7).into();
1663 let original = vec![9, 9, 9, 8, 8, 9, 9];
1664 let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone());
1665
1666 vm.compress_adjacent_in_place();
1667 let once: Vec<_> = vm.values().collect();
1668 assert_eq!(once, original);
1669
1670 vm.compress_adjacent_in_place();
1673 let twice: Vec<_> = vm.values().collect();
1674 assert_eq!(twice, original);
1675 }
1676
1677 #[test]
1678 fn rle_works_after_build_indexed() {
1679 let region: Region = extent!(x = 2, y = 3).into(); let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)];
1683 let mut vm = pairs
1684 .into_iter()
1685 .collect_indexed::<ValueMesh<_>>(region.clone())
1686 .unwrap();
1687
1688 vm.compress_adjacent_in_place();
1691 let collected: Vec<_> = vm.values().collect();
1692 assert_eq!(collected, vec![0, 10, 20, 30, 40, 50]);
1693 assert_eq!(vm.get(4), Some(&40));
1695 }
1696
1697 #[test]
1698 fn rle_handles_singleton_mesh() {
1699 let region: Region = extent!(n = 1).into();
1700 let mut vm = ValueMesh::new_unchecked(region.clone(), vec![123]);
1701
1702 vm.compress_adjacent_in_place();
1703 let collected: Vec<_> = vm.values().collect();
1704 assert_eq!(collected, vec![123]);
1705 assert_eq!(vm.get(0), Some(&123));
1706 assert_eq!(vm.get(1), None);
1707 }
1708
1709 #[test]
1710 fn test_dense_round_trip_json() {
1711 let region: Region = extent!(x = 5).into();
1713 let dense = ValueMesh::new(region.clone(), vec![1, 2, 3, 4, 5]).unwrap();
1714
1715 let json = serde_json::to_string_pretty(&dense).unwrap();
1716 let restored: ValueMesh<i32> = serde_json::from_str(&json).unwrap();
1717
1718 assert_eq!(dense, restored);
1719
1720 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1723 let tag = v.get("rep").and_then(|o| o.get("Dense"));
1725 assert!(tag.is_some(), "json is {}", json);
1726 }
1727
1728 #[test]
1729 fn test_dense_round_trip_bincode() {
1730 let region: Region = extent!(x = 5).into();
1732 let dense = ValueMesh::new(region.clone(), vec![1, 2, 3, 4, 5]).unwrap();
1733
1734 let encoded = bincode::serialize(&dense).unwrap();
1735 let restored: ValueMesh<i32> = bincode::deserialize(&encoded).unwrap();
1736
1737 assert_eq!(dense, restored);
1738 assert!(matches!(restored.rep, Rep::Dense { .. }));
1740 }
1741
1742 #[test]
1743 fn test_compressed_round_trip_json() {
1744 let region: Region = extent!(x = 10).into();
1747 let mut mesh = ValueMesh::new(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 3, 3, 3]).unwrap();
1748 mesh.compress_adjacent_in_place();
1749
1750 let json = serde_json::to_string_pretty(&mesh).unwrap();
1751 let restored: ValueMesh<i32> = serde_json::from_str(&json).unwrap();
1752
1753 assert_eq!(mesh, restored);
1755
1756 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1758 let tag = v.get("rep").and_then(|o| o.get("Compressed"));
1760 assert!(tag.is_some(), "json is {}", json);
1761 }
1762
1763 #[test]
1764 fn test_compressed_round_trip_bincode() {
1765 let region: Region = extent!(x = 10).into();
1768 let mut mesh = ValueMesh::new(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 3, 3, 3]).unwrap();
1769 mesh.compress_adjacent_in_place();
1770
1771 let encoded = bincode::serialize(&mesh).unwrap();
1772 let restored: ValueMesh<i32> = bincode::deserialize(&encoded).unwrap();
1773
1774 assert_eq!(mesh, restored);
1776 assert!(matches!(restored.rep, Rep::Compressed { .. }));
1778 }
1779
1780 #[test]
1781 fn test_stable_run_encoding() {
1782 let run = Run::new(0, 10, 42);
1783 let json = serde_json::to_string(&run).unwrap();
1784 let decoded: Run = serde_json::from_str(&json).unwrap();
1785
1786 assert_eq!(run, decoded);
1787 assert_eq!(run.start, 0);
1788 assert_eq!(run.end, 10);
1789 assert_eq!(run.id, 42);
1790
1791 let (range, id): (Range<usize>, u32) = run.try_into().unwrap();
1793 assert_eq!(range, 0..10);
1794 assert_eq!(id, 42);
1795 }
1796
1797 #[test]
1798 fn from_single_builds_single_run() {
1799 let region: Region = extent!(n = 6).into();
1800 let vm = ValueMesh::from_single(region.clone(), 7);
1801
1802 assert_eq!(vm.region(), ®ion);
1803 assert_eq!(vm.values().collect::<Vec<_>>(), vec![7, 7, 7, 7, 7, 7]);
1804 assert_eq!(vm.get(0), Some(&7));
1805 assert_eq!(vm.get(5), Some(&7));
1806 assert_eq!(vm.get(6), None);
1807 }
1808
1809 #[test]
1810 fn from_default_builds_with_default_value() {
1811 let region: Region = extent!(n = 6).into();
1812 let vm = ValueMesh::<i32>::from_default(region.clone());
1813
1814 assert_eq!(vm.region(), ®ion);
1815 assert_eq!(vm.values().collect::<Vec<_>>(), vec![0, 0, 0, 0, 0, 0]);
1817 assert_eq!(vm.get(0), Some(&0));
1818 assert_eq!(vm.get(5), Some(&0));
1819 }
1820
1821 #[test]
1822 fn test_default_vs_single_equivalence() {
1823 let region: Region = extent!(x = 4).into();
1824 let d1 = ValueMesh::<i32>::from_default(region.clone());
1825 let d2 = ValueMesh::from_single(region.clone(), 0);
1826 assert_eq!(d1, d2);
1827 }
1828
1829 #[test]
1830 fn build_from_ranges_with_default_basic() {
1831 let region: Region = extent!(n = 10).into();
1832 let vm = ValueMesh::from_ranges_with_default(
1833 region.clone(),
1834 0, vec![(2..4, 1), (6..9, 2)],
1836 )
1837 .unwrap();
1838
1839 assert_eq!(vm.region(), ®ion);
1840 assert_eq!(
1841 vm.values().collect::<Vec<_>>(),
1842 vec![0, 0, 1, 1, 0, 0, 2, 2, 2, 0]
1843 );
1844
1845 if let Rep::Compressed { table, runs } = &vm.rep {
1848 assert!(table.len() <= 3);
1850 assert_eq!(runs.len(), 5);
1851 } else {
1852 panic!("expected compressed");
1853 }
1854 }
1855
1856 #[test]
1857 fn build_from_ranges_with_default_edge_cases() {
1858 let region: Region = extent!(n = 5).into();
1859
1860 let vm = ValueMesh::from_ranges_with_default(region.clone(), 9, vec![(0..5, 3)]).unwrap();
1862 assert_eq!(vm.values().collect::<Vec<_>>(), vec![3, 3, 3, 3, 3]);
1863
1864 let vm = ValueMesh::from_ranges_with_default(region.clone(), 0, vec![(1..2, 7), (2..4, 7)])
1866 .unwrap();
1867 assert_eq!(vm.values().collect::<Vec<_>>(), vec![0, 7, 7, 7, 0]);
1868
1869 let empty_region: Region = extent!(n = 0).into();
1871 let vm = ValueMesh::from_ranges_with_default(empty_region.clone(), 42, vec![]).unwrap();
1872 assert_eq!(vm.values().collect::<Vec<_>>(), Vec::<i32>::new());
1873 }
1874
1875 #[test]
1876 fn from_dense_builds_and_compresses() {
1877 let region: Region = extent!(n = 6).into();
1878 let mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 2, 2, 3, 3]).unwrap();
1879
1880 assert_eq!(mesh.region(), ®ion);
1881 assert!(matches!(mesh.rep, Rep::Compressed { .. }));
1882 assert_eq!(mesh.values().collect::<Vec<_>>(), vec![1, 1, 2, 2, 3, 3]);
1883
1884 assert_eq!(mesh.get(0), Some(&1));
1886 assert_eq!(mesh.get(3), Some(&2));
1887 assert_eq!(mesh.get(5), Some(&3));
1888 }
1889
1890 #[test]
1891 fn merge_from_overlay_basic() {
1892 let region: Region = extent!(n = 8).into();
1894 let mut mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3]).unwrap();
1895
1896 let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9)]).unwrap();
1898
1899 mesh.merge_from_overlay(overlay).unwrap();
1900
1901 let out = mesh.materialized_runs();
1903
1904 assert_eq!(out, vec![(0..2, 1), (2..6, 9), (6..8, 3)]);
1906 }
1907
1908 #[test]
1909 fn merge_from_overlay_multiple_spans() {
1910 let region: Region = extent!(m = 12).into();
1912 let mut mesh =
1913 ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4])
1914 .unwrap();
1915
1916 let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9), (9..11, 8)]).unwrap();
1919
1920 mesh.merge_from_overlay(overlay).unwrap();
1921 let out = mesh.materialized_runs();
1922
1923 assert_eq!(
1930 out,
1931 vec![(0..2, 1), (2..6, 9), (6..9, 3), (9..11, 8), (11..12, 4)]
1932 );
1933 }
1934
1935 #[test]
1936 fn merge_from_overlay_crosses_row_boundary() {
1937 let region: Region = extent!(rows = 2, cols = 5).into();
1939
1940 let mut mesh =
1944 ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 4, 4, 4]).unwrap();
1945
1946 let overlay = ValueOverlay::try_from_runs(vec![(3..7, 9)]).unwrap();
1951
1952 mesh.merge_from_overlay(overlay).unwrap();
1953
1954 let flat: Vec<_> = mesh.values().collect();
1957 assert_eq!(flat, vec![1, 1, 1, 9, 9, 9, 9, 4, 4, 4]);
1958
1959 let runs = mesh.materialized_runs();
1962 assert_eq!(runs, vec![(0..3, 1), (3..7, 9), (7..10, 4)]);
1963 }
1964
1965 #[test]
1966 fn accumulator_initializes_and_merges_overlays() {
1967 #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Named)]
1970 enum Val {
1971 Init,
1972 Running,
1973 Stopped,
1974 }
1975
1976 let region: Region = extent!(m = 8).into();
1977 let template = ValueMesh::from_single(region.clone(), Val::Init);
1978
1979 let mut state = ValueMesh::<Val>::default();
1981 assert_eq!(state.region().num_ranks(), 0);
1982
1983 let overlay1 = ValueOverlay::try_from_runs(vec![(2..5, Val::Running)]).unwrap();
1985 template.accumulate(&mut state, overlay1).unwrap();
1986
1987 assert_eq!(state.region().num_ranks(), 8);
1989 let vals: Vec<_> = state.values().collect();
1990 assert_eq!(
1991 vals,
1992 vec![
1993 Val::Init,
1994 Val::Init,
1995 Val::Running,
1996 Val::Running,
1997 Val::Running,
1998 Val::Init,
1999 Val::Init,
2000 Val::Init,
2001 ]
2002 );
2003
2004 let overlay2 = ValueOverlay::try_from_runs(vec![(5..7, Val::Stopped)]).unwrap();
2006 template.accumulate(&mut state, overlay2).unwrap();
2007
2008 let vals: Vec<_> = state.values().collect();
2009 assert_eq!(
2010 vals,
2011 vec![
2012 Val::Init,
2013 Val::Init,
2014 Val::Running,
2015 Val::Running,
2016 Val::Running,
2017 Val::Stopped,
2018 Val::Stopped,
2019 Val::Init,
2020 ]
2021 );
2022
2023 let spec = template
2025 .reducer_spec()
2026 .expect("reducer_spec should be Some");
2027 assert_eq!(
2028 spec.typehash,
2029 <ValueOverlayReducer<Val> as Named>::typehash()
2030 );
2031 }
2032
2033 #[test]
2034 fn value_overlay_reducer_merges_with_right_wins() {
2035 #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Named)]
2038 enum Val {
2039 Init,
2040 Running,
2041 Stopped,
2042 }
2043
2044 let reducer = ValueOverlayReducer::<Val>(PhantomData);
2045
2046 let left =
2048 ValueOverlay::try_from_runs(vec![(0..4, Val::Running), (6..8, Val::Stopped)]).unwrap();
2049
2050 let right = ValueOverlay::try_from_runs(vec![(2..6, Val::Init)]).unwrap();
2053
2054 let merged = reducer.reduce(left, right).unwrap();
2055 let runs: Vec<_> = merged.runs().cloned().collect();
2056
2057 assert_eq!(
2062 runs,
2063 vec![
2064 (0..2, Val::Running),
2065 (2..6, Val::Init),
2066 (6..8, Val::Stopped),
2067 ]
2068 );
2069 }
2070}