controller/
history.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::cmp::Ordering;
10use std::collections::BTreeMap;
11use std::collections::HashMap;
12use std::collections::HashSet;
13
14use hyperactor::clock::Clock;
15use hyperactor::data::Serialized;
16use monarch_messages::client::Exception;
17use monarch_messages::controller::Seq;
18use monarch_messages::controller::WorkerError;
19use monarch_messages::worker::Ref;
20
21/// An invocation tracks a discrete node in the graph of operations executed by
22/// the worker based on instructions from the client.
23/// It is useful for tracking the dependencies of an operation and propagating
24/// failures. In the future this will be used with more data dependency tracking
25/// to support better failure handling.
26// Allowing dead code until we do something smarter with defs, uses etc.
27#[allow(dead_code)]
28#[derive(Debug)]
29struct Invocation {
30    /// The sequence number of the invocation. This should be unique and increasing across all
31    /// invocations.
32    seq: Seq,
33    /// The references that this invocation defines or redefines. Effectively the
34    /// output of the invocation.
35    defs: Vec<Ref>,
36    /// The references that this invocation uses. Effectively the input of the invocation.
37    uses: Vec<Ref>,
38    /// The result of the invocation. This is set when the invocation is completed or
39    /// when a failure is inferred. A successful result will always supersede any failure.
40    result: Option<Result<Serialized, Exception>>,
41    /// The seqs for the invocations that depend on this invocation. Useful for propagating failures.
42    users: HashSet<Seq>,
43    /// If we have reported the result eagerly, we want to make sure to not double report. This also
44    /// lets us know when we can stop traversing when finding unreported results
45    reported: bool,
46}
47
48impl Invocation {
49    fn new(seq: Seq, uses: Vec<Ref>, defs: Vec<Ref>) -> Self {
50        Self {
51            seq,
52            uses,
53            defs,
54            result: None,
55            users: HashSet::new(),
56            reported: false,
57        }
58    }
59
60    fn add_user(&mut self, user: Seq) {
61        self.users.insert(user);
62    }
63
64    /// Invocation results can only go from valid to failed, or be
65    /// set if the invocation result is empty.
66    fn set_result(&mut self, result: Result<Serialized, Exception>) {
67        if self.result.is_none() || matches!((&self.result, &result), (Some(Ok(_)), Err(_))) {
68            self.result = Some(result);
69        }
70    }
71
72    fn set_exception(&mut self, exception: Exception) {
73        match exception {
74            Exception::Error(_, caused_by, error) => {
75                self.set_result(Err(Exception::Error(self.seq, caused_by, error)));
76            }
77            Exception::Failure(_) => {
78                tracing::error!(
79                    "system failures {:?} can never be assigned for an invocation",
80                    exception
81                );
82            }
83        }
84    }
85
86    fn exception(&self) -> Option<&Exception> {
87        self.result
88            .as_ref()
89            .map(Result::as_ref)
90            .and_then(Result::err)
91    }
92
93    #[allow(dead_code)]
94    fn value(&self) -> Option<&Serialized> {
95        self.result
96            .as_ref()
97            .map(Result::as_ref)
98            .and_then(Result::ok)
99    }
100}
101
102#[derive(Debug, PartialEq)]
103enum RefStatus {
104    // The invocation for this ref is still in progress.
105    Invoked(Seq),
106    // The invocation for this ref has errored.
107    Errored(Exception),
108}
109
110/// The history of invocations sent by the client to be executed on the workers.
111/// This is used to track dependencies between invocations and to propagate exceptions.
112/// It purges history for completed invocations to avoid memory bloat.
113/// TODO: Revisit this setup around purging refs automatically once we start doing
114/// more complex data dependency tracking. We will want to be more aware of things like
115/// borrows, drops etc. directly.
116#[derive(Debug)]
117#[allow(dead_code)]
118pub struct History {
119    /// The first incomplete Seq for each rank. This is used to determine which
120    /// Seqs are no longer relevant and can be purged from the history.
121    first_incomplete_seqs: MinVector<Seq>,
122    /// The minimum incomplete Seq across all ranks.
123    min_incomplete_seq: Seq,
124    /// A map of seq to the invocation that it represents.
125    invocations: HashMap<Seq, Invocation>,
126    /// A map of reference to the seq for the invocation that defines it. This is used to
127    /// compute dependencies between invocations.
128    invocation_for_ref: HashMap<Ref, RefStatus>,
129    // Refs to be deleted in mark_worker_complete_and_propagate_failures
130    marked_for_deletion: HashSet<Ref>,
131    // Last seq to be invoked
132    max_seq: OptionSeq,
133    // The first incompleted Seq for each rank derived from both client and controller request_status messages
134    // This is needed because the client may retain invocations past the time completed such as in call_fetch_shard().result()
135    first_incomplete_seqs_controller: MinVector<Seq>,
136    // Memoized minimum incompleted Seq across all ranks of first_incomplete_seqs_controller
137    min_incompleted_seq_controller: Seq,
138    // The deadline for the next expected completed seq. This is updated only when the previous deadline
139    // has been met.
140    //
141    // Tuple fields are:
142    // - the seq we expect to be completed
143    // - the deadline
144    // - if it has already been reported to the client
145    deadline: Option<(Seq, tokio::time::Instant, bool)>,
146}
147
148/// A vector that keeps track of the minimum value.
149#[derive(Debug)]
150struct MinVector<T> {
151    data: Vec<T>,
152    value_counts: BTreeMap<T, usize>,
153}
154
155impl<T> MinVector<T>
156where
157    T: Ord + Copy,
158{
159    fn new(data: Vec<T>) -> Self {
160        let mut value_counts = BTreeMap::new();
161        for &value in &data {
162            *value_counts.entry(value).or_insert(0) += 1;
163        }
164        MinVector { data, value_counts }
165    }
166
167    fn set(&mut self, index: usize, value: T) {
168        // Decrease the count of the old value
169        let old_value = self.data[index];
170        if let Some(count) = self.value_counts.get_mut(&old_value) {
171            *count -= 1;
172            if *count == 0 {
173                self.value_counts.remove(&old_value);
174            }
175        }
176        // Update the value in the vector
177        self.data[index] = value;
178
179        // Increase the count of the new value
180        *self.value_counts.entry(value).or_insert(0) += 1;
181    }
182
183    fn get(&self, index: usize) -> T {
184        self.data[index]
185    }
186
187    fn min(&self) -> T {
188        *self.value_counts.keys().next().unwrap()
189    }
190
191    fn len(&self) -> usize {
192        self.data.len()
193    }
194
195    fn vec(&self) -> &Vec<T> {
196        &self.data
197    }
198}
199
200impl History {
201    pub fn new(world_size: usize) -> Self {
202        Self {
203            first_incomplete_seqs: MinVector::new(vec![Seq::default(); world_size]),
204            min_incomplete_seq: Seq::default(),
205            invocation_for_ref: HashMap::new(),
206            invocations: HashMap::new(),
207            marked_for_deletion: HashSet::new(),
208            max_seq: OptionSeq::from(None),
209            first_incomplete_seqs_controller: MinVector::new(vec![Seq::default(); world_size]),
210            min_incompleted_seq_controller: Seq::default(),
211            deadline: None,
212        }
213    }
214
215    #[cfg(test)]
216    pub fn first_incomplete_seqs(&self) -> &[Seq] {
217        self.first_incomplete_seqs.vec()
218    }
219
220    pub fn first_incomplete_seqs_controller(&self) -> &[Seq] {
221        self.first_incomplete_seqs_controller.vec()
222    }
223
224    pub fn min_incomplete_seq_reported(&self) -> Seq {
225        self.min_incompleted_seq_controller
226    }
227
228    pub fn world_size(&self) -> usize {
229        self.first_incomplete_seqs.len()
230    }
231
232    pub fn delete_invocations_for_refs(&mut self, refs: Vec<Ref>) {
233        self.marked_for_deletion.extend(refs);
234
235        self.marked_for_deletion
236            .retain(|ref_| match self.invocation_for_ref.get(ref_) {
237                Some(RefStatus::Invoked(seq)) => {
238                    if seq < &self.min_incomplete_seq {
239                        self.invocation_for_ref.remove(ref_);
240                        false
241                    } else {
242                        true
243                    }
244                }
245                Some(RefStatus::Errored(_)) => {
246                    self.invocation_for_ref.remove(ref_);
247                    false
248                }
249                None => true,
250            });
251    }
252
253    /// Add an invocation to the history.
254    pub fn add_invocation(
255        &mut self,
256        seq: Seq,
257        uses: Vec<Ref>,
258        defs: Vec<Ref>,
259    ) -> Vec<(Seq, Option<Result<Serialized, Exception>>)> {
260        let mut results = Vec::new();
261        let input_seq = OptionSeq::from(seq);
262        assert!(
263            input_seq >= self.max_seq,
264            "nonmonotonic seq: {:?}; current max: {:?}",
265            seq,
266            self.max_seq,
267        );
268        self.max_seq = input_seq;
269        let mut invocation = Invocation::new(seq, uses.clone(), defs.clone());
270
271        for use_ in uses {
272            // The invocation for every use_ should add this seq as a user.
273            match self.invocation_for_ref.get(&use_) {
274                Some(RefStatus::Errored(exception)) => {
275                    // We know that this invocation hasn't been completed yet, so we can
276                    // directly call set_exception on it.
277                    if !invocation.reported {
278                        invocation.set_exception(exception.clone());
279                        results.push((seq, Some(Err(exception.clone()))));
280                        invocation.reported = true;
281                    }
282                }
283                Some(RefStatus::Invoked(invoked_seq)) => {
284                    if let Some(invocation) = self.invocations.get_mut(invoked_seq) {
285                        invocation.add_user(seq)
286                    }
287                }
288                None => tracing::debug!(
289                    "ignoring dependency on potentially complete invocation for ref: {:?}",
290                    use_
291                ),
292            }
293        }
294        for def in defs {
295            self.invocation_for_ref.insert(
296                def,
297                match invocation.exception() {
298                    Some(err) => RefStatus::Errored(err.clone()),
299                    None => RefStatus::Invoked(seq.clone()),
300                },
301            );
302        }
303
304        self.invocations.insert(seq, invocation);
305
306        results
307    }
308
309    /// Propagate worker error to the invocation with the given Seq. This will also propagate
310    /// to all seqs that depend on this seq directly or indirectly.
311    pub fn propagate_exception(&mut self, seq: Seq, exception: Exception) {
312        let mut queue = vec![seq];
313        let mut visited = HashSet::new();
314
315        while let Some(seq) = queue.pop() {
316            if !visited.insert(seq) {
317                continue;
318            }
319
320            let Some(invocation) = self.invocations.get_mut(&seq) else {
321                continue;
322            };
323
324            // Overwrite the error, so we are using the last error for this invocation to send
325            // to the client.
326            for def in invocation.defs.iter() {
327                match self.invocation_for_ref.get(def) {
328                    Some(RefStatus::Invoked(invoked_seq)) if *invoked_seq == seq => self
329                        .invocation_for_ref
330                        .insert(*def, RefStatus::Errored(exception.clone())),
331                    _ => None,
332                };
333            }
334            invocation.set_exception(exception.clone());
335            queue.extend(invocation.users.iter());
336        }
337    }
338
339    fn find_unreported_dependent_exceptions(
340        &mut self,
341        seq: Seq,
342    ) -> Vec<(Seq, Option<Result<Serialized, Exception>>)> {
343        let mut queue = vec![seq];
344        let mut visited = HashSet::new();
345        let mut results = Vec::new();
346
347        while let Some(seq) = queue.pop() {
348            if !visited.insert(seq) {
349                continue;
350            }
351
352            let Some(invocation) = self.invocations.get_mut(&seq) else {
353                continue;
354            };
355
356            if !matches!(invocation.result, Some(Err(_))) || invocation.reported {
357                continue;
358            }
359
360            invocation.reported = true;
361
362            results.push((seq, invocation.result.clone()));
363
364            queue.extend(invocation.users.iter());
365        }
366        results
367    }
368
369    pub fn report_deadline_missed(&mut self) {
370        if let Some((seq, time, _)) = self.deadline {
371            self.deadline = Some((seq, time, true));
372        }
373    }
374
375    pub fn deadline(
376        &mut self,
377        expected_progress: u64,
378        timeout: tokio::time::Duration,
379        clock: &impl Clock,
380    ) -> Option<(Seq, tokio::time::Instant, bool)> {
381        let previous_deadline_completed = match self.deadline {
382            Some((expected_seq, ..)) => self.min_incompleted_seq_controller > expected_seq,
383            None => self.max_seq.inner().is_some(),
384        };
385
386        if previous_deadline_completed {
387            let next_expected_completed_seq = std::cmp::min(
388                OptionSeq::from(u64::from(self.min_incompleted_seq_controller) + expected_progress),
389                self.max_seq.clone(),
390            );
391
392            self.deadline =
393                next_expected_completed_seq
394                    .into_inner()
395                    .map(|next_expected_completed_seq| {
396                        (next_expected_completed_seq, clock.now() + timeout, false)
397                    });
398        }
399        self.deadline
400    }
401
402    pub fn update_deadline_tracking(&mut self, rank: usize, seq: Seq) {
403        // rank_completed also calls this so that we stay up to date with client request_status messages.
404        // However, controller request_status messages may be ahead of the client as the client may retain invocations
405        // past the time completed so we should take the max
406        self.first_incomplete_seqs_controller.set(
407            rank,
408            std::cmp::max(seq, self.first_incomplete_seqs.get(rank)),
409        );
410
411        self.min_incompleted_seq_controller = self.first_incomplete_seqs_controller.min();
412    }
413
414    /// Mark the given rank as completed up to but excluding the given Seq. This will also purge history for
415    /// any Seqs that are no longer relevant (completed on all ranks).
416    pub fn rank_completed(
417        &mut self,
418        rank: usize,
419        seq: Seq,
420    ) -> Vec<(Seq, Option<Result<Serialized, Exception>>)> {
421        self.first_incomplete_seqs.set(rank, seq);
422        let prev = self.min_incomplete_seq;
423        self.min_incomplete_seq = self.first_incomplete_seqs.min();
424        self.update_deadline_tracking(rank, seq);
425
426        let mut results: Vec<(Seq, Option<Result<Serialized, Exception>>)> = Vec::new();
427        for i in Seq::iter_between(prev, self.min_incomplete_seq) {
428            if let Some(invocation) = self.invocations.remove(&i) {
429                let retain = if let Some(result) = invocation.result {
430                    let is_err = result.is_err();
431                    if !invocation.reported {
432                        results.push((i, Some(result)));
433                    }
434                    is_err
435                } else {
436                    // Do not retain successful invocations.
437                    results.push((i, None));
438                    false
439                };
440
441                if retain {
442                    // Retain the def history because we may need it to propagate
443                    // errors in the future. We rely here on the fact that the invocation
444                    // above has been marked as failed by way of failure propagation.
445                    for def in &invocation.defs {
446                        match self.invocation_for_ref.get(def) {
447                            Some(RefStatus::Invoked(seq)) if *seq == i => {
448                                self.invocation_for_ref.remove(def)
449                            }
450                            _ => None,
451                        };
452                    }
453                }
454            }
455        }
456
457        // Propagate results to the client even if it is behind the completion frontier
458        // if we can determine for sure that it is completed
459        results.extend(self.find_unreported_dependent_exceptions(seq));
460
461        results
462    }
463
464    #[cfg(test)]
465    fn get_invocation(&self, seq: Seq) -> Option<&Invocation> {
466        self.invocations.get(&seq)
467    }
468
469    pub fn set_result(&mut self, seq: Seq, result: Result<Serialized, WorkerError>) {
470        if let Some(invocation) = self.invocations.get_mut(&seq) {
471            invocation.set_result(result.map_err(|e| Exception::Error(seq, seq, e)));
472        }
473    }
474}
475
476/// Struct representing an optional `Seq`, where `None` is always considered the
477/// smallest. This type is to make it easier to compare `Option<Seq>` with `Seq`
478/// or `Option<Seq>`.
479#[derive(Clone, Debug, PartialEq, Eq)]
480pub(crate) struct OptionSeq(Option<Seq>);
481
482impl OptionSeq {
483    /// Return inner ref.
484    pub fn inner(&self) -> &Option<Seq> {
485        &self.0
486    }
487
488    /// Return inner.
489    pub fn into_inner(self) -> Option<Seq> {
490        self.0
491    }
492}
493
494impl PartialOrd for OptionSeq {
495    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
496        Some(self.cmp(other))
497    }
498}
499
500impl Ord for OptionSeq {
501    fn cmp(&self, other: &Self) -> Ordering {
502        match (self.0, other.0) {
503            (Some(a), Some(b)) => a.cmp(&b),
504            (Some(_), None) => Ordering::Greater,
505            (None, Some(_)) => Ordering::Less,
506            (None, None) => Ordering::Equal,
507        }
508    }
509}
510
511impl From<u64> for OptionSeq {
512    fn from(value: u64) -> Self {
513        OptionSeq(Some(Seq::from(value)))
514    }
515}
516
517impl From<Seq> for OptionSeq {
518    fn from(seq: Seq) -> Self {
519        OptionSeq(Some(seq))
520    }
521}
522
523impl From<Option<Seq>> for OptionSeq {
524    fn from(value: Option<Seq>) -> Self {
525        OptionSeq(value)
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use std::assert_matches::assert_matches;
532
533    use hyperactor::id;
534
535    use super::*;
536
537    struct InvocationUsersIterator<'a> {
538        history: &'a History,
539        stack: Vec<&'a Invocation>,
540        visited: HashSet<Seq>,
541    }
542
543    impl<'a> Iterator for InvocationUsersIterator<'a> {
544        type Item = &'a Invocation;
545
546        fn next(&mut self) -> Option<Self::Item> {
547            while let Some(invocation) = self.stack.pop() {
548                if !self.visited.insert(invocation.seq) {
549                    continue;
550                }
551                self.stack.extend(
552                    invocation
553                        .users
554                        .iter()
555                        .filter_map(|seq| self.history.invocations.get(seq)),
556                );
557                return Some(invocation);
558            }
559            None
560        }
561    }
562
563    impl History {
564        /// Get an iterator of Seqs that are users of (or dependent on the completion of) the given Seq.
565        /// This is useful for propagating failures. This will return an empty iterator if the given Seq is
566        /// not in the history. So this should be called before the invocation is marked as completed for the
567        /// given rank.
568        /// The Seq passed to this function will also be included in the iterator.
569        pub(crate) fn iter_users_transitive(&self, seq: Seq) -> impl Iterator<Item = Seq> + '_ {
570            let invocations = self
571                .invocations
572                .get(&seq)
573                .map_or(Vec::default(), |invocation| vec![invocation]);
574
575            InvocationUsersIterator {
576                history: self,
577                stack: invocations,
578                visited: HashSet::new(),
579            }
580            .map(|invocation| invocation.seq)
581        }
582    }
583
584    #[test]
585    fn simple_history() {
586        let mut history = History::new(2);
587        history.add_invocation(0.into(), vec![], vec![Ref { id: 1 }, Ref { id: 2 }]);
588        history.add_invocation(1.into(), vec![Ref { id: 1 }], vec![Ref { id: 3 }]);
589        history.add_invocation(2.into(), vec![Ref { id: 3 }], vec![Ref { id: 4 }]);
590        history.add_invocation(3.into(), vec![Ref { id: 3 }], vec![Ref { id: 5 }]);
591        history.add_invocation(4.into(), vec![Ref { id: 3 }], vec![Ref { id: 6 }]);
592        history.add_invocation(5.into(), vec![Ref { id: 4 }], vec![Ref { id: 7 }]);
593        history.add_invocation(6.into(), vec![Ref { id: 4 }], vec![Ref { id: 8 }]);
594
595        let mut res = history
596            .iter_users_transitive(1.into())
597            .collect::<Vec<Seq>>();
598        res.sort();
599        assert_eq!(
600            res,
601            vec![1.into(), 2.into(), 3.into(), 4.into(), 5.into(), 6.into()]
602        );
603
604        history.rank_completed(0, 2.into());
605        let mut res = history
606            .iter_users_transitive(1.into())
607            .collect::<Vec<Seq>>();
608        res.sort();
609        assert_eq!(
610            res,
611            vec![1.into(), 2.into(), 3.into(), 4.into(), 5.into(), 6.into()]
612        );
613
614        history.rank_completed(1, 2.into());
615        let res = history
616            .iter_users_transitive(1.into())
617            .collect::<Vec<Seq>>();
618        assert_eq!(res, vec![]);
619
620        // Test that we can still add invocations after all ranks have completed that seq.
621        history.add_invocation(7.into(), vec![Ref { id: 1 }], vec![]);
622    }
623
624    #[test]
625    fn delete_errored_invocations() {
626        let mut history = History::new(1);
627        history.add_invocation(0.into(), vec![], vec![Ref { id: 1 }, Ref { id: 2 }]);
628        history.add_invocation(1.into(), vec![Ref { id: 1 }], vec![Ref { id: 3 }]);
629        history.propagate_exception(
630            0.into(),
631            Exception::Error(
632                0.into(),
633                0.into(),
634                WorkerError {
635                    backtrace: "worker error happened".to_string(),
636                    worker_actor_id: id!(test[234].testactor[6]),
637                },
638            ),
639        );
640        history.delete_invocations_for_refs(vec![Ref { id: 1 }, Ref { id: 2 }]);
641        history.rank_completed(0, 1.into());
642        assert_eq!(history.invocation_for_ref.len(), 1);
643        history.delete_invocations_for_refs(vec![Ref { id: 3 }]);
644        history.rank_completed(0, 2.into());
645        assert!(history.invocation_for_ref.is_empty());
646    }
647
648    #[test]
649    fn redefinitions() {
650        let mut history = History::new(2);
651        history.add_invocation(0.into(), vec![], vec![Ref { id: 1 }, Ref { id: 2 }]);
652        history.add_invocation(1.into(), vec![Ref { id: 1 }], vec![Ref { id: 3 }]);
653        history.add_invocation(2.into(), vec![Ref { id: 3 }], vec![Ref { id: 4 }]);
654
655        let mut res = history
656            .iter_users_transitive(1.into())
657            .collect::<Vec<Seq>>();
658        res.sort();
659        assert_eq!(res, vec![1.into(), 2.into()]);
660
661        history.add_invocation(3.into(), vec![Ref { id: 3 }], vec![Ref { id: 3 }]);
662        history.add_invocation(4.into(), vec![Ref { id: 3 }], vec![Ref { id: 6 }]);
663        history.add_invocation(5.into(), vec![Ref { id: 4 }], vec![Ref { id: 7 }]);
664        history.add_invocation(6.into(), vec![Ref { id: 4 }], vec![Ref { id: 8 }]);
665
666        history.rank_completed(0, 2.into());
667        history.rank_completed(1, 2.into());
668
669        let res = history
670            .iter_users_transitive(3.into())
671            .collect::<Vec<Seq>>();
672        assert_eq!(res, vec![3.into(), 4.into()]);
673    }
674
675    #[test]
676    fn min_vector() {
677        // Test initialization
678        let data = vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5];
679        let mut min_vector = MinVector::new(data.clone());
680
681        // Test length
682        assert_eq!(min_vector.len(), data.len());
683
684        // Test initial vector
685        assert_eq!(min_vector.vec(), &data);
686
687        // Test initial minimum
688        assert_eq!(min_vector.min(), 1);
689
690        // Test get method
691        for (i, &value) in data.iter().enumerate() {
692            assert_eq!(min_vector.get(i), value);
693        }
694
695        // Test set method and min update
696        min_vector.set(0, 0); // Change first element to 0
697        assert_eq!(min_vector.get(0), 0);
698        assert_eq!(min_vector.min(), 0);
699        min_vector.set(1, 7); // Change second element to 7
700        assert_eq!(min_vector.get(1), 7);
701        assert_eq!(min_vector.min(), 0);
702        min_vector.set(0, 8); // Change first element to 8
703        assert_eq!(min_vector.get(0), 8);
704        assert_eq!(min_vector.min(), 1); // Minimum should now be 1
705
706        // Test setting a value that already exists
707        min_vector.set(2, 5); // Change third element to 5
708        assert_eq!(min_vector.get(2), 5);
709        assert_eq!(min_vector.min(), 1);
710
711        // Test setting a value to the current minimum
712        min_vector.set(3, 0); // Change fourth element to 0
713        assert_eq!(min_vector.get(3), 0);
714        assert_eq!(min_vector.min(), 0);
715
716        // Test setting all elements to the same value
717        for i in 0..min_vector.len() {
718            min_vector.set(i, 10);
719        }
720        assert_eq!(min_vector.min(), 10);
721        assert_eq!(min_vector.vec(), &vec![10; min_vector.len()]);
722    }
723
724    #[test]
725    fn failure_propagation() {
726        let mut history = History::new(2);
727
728        history.add_invocation(0.into(), vec![], vec![Ref { id: 1 }, Ref { id: 2 }]);
729        history.add_invocation(1.into(), vec![Ref { id: 1 }], vec![Ref { id: 3 }]);
730        history.add_invocation(
731            2.into(),
732            vec![Ref { id: 3 }],
733            vec![Ref { id: 4 }, Ref { id: 5 }],
734        );
735        history.add_invocation(3.into(), vec![Ref { id: 2 }], vec![Ref { id: 6 }]);
736        history.add_invocation(4.into(), vec![Ref { id: 5 }], vec![Ref { id: 6 }]);
737
738        // No error before propagation
739        for i in 1..=3 {
740            assert!(
741                history
742                    .get_invocation(i.into())
743                    .unwrap()
744                    .exception()
745                    .is_none()
746            );
747        }
748
749        // Failure happened to invocation 1, invocations 2, 4 should be marked as failed because they
750        // depend on 1 directly or indirectly.
751        history.propagate_exception(
752            1.into(),
753            Exception::Error(
754                1.into(),
755                1.into(),
756                WorkerError {
757                    backtrace: "worker error happened".to_string(),
758                    worker_actor_id: "test[234].testactor[6]".parse().unwrap(),
759                },
760            ),
761        );
762
763        // Error should be set for all invocations that depend on the failed invocation
764        for i in [1, 2, 4] {
765            assert!(
766                history
767                    .get_invocation(i.into())
768                    .unwrap()
769                    .exception()
770                    .is_some()
771            );
772        }
773
774        // Error should not be set for invocations that do not depend on the failed invocation
775        for i in [0, 3] {
776            assert!(
777                history
778                    .get_invocation(i.into())
779                    .unwrap()
780                    .exception()
781                    .is_none()
782            );
783        }
784
785        // A failed but completed invocation should still lead to all its
786        // invocations being marked as failed even if they appear in the future.
787
788        // Delete until 2.
789        history.rank_completed(0, 2.into());
790        history.rank_completed(1, 2.into());
791
792        for i in [3, 4, 5, 6] {
793            assert_matches!(
794                history.invocation_for_ref.get(&i.into()),
795                Some(RefStatus::Errored(_)),
796            );
797            // Invocation should start from 5, so i+2
798            history.add_invocation((i + 2).into(), vec![Ref { id: i }], vec![Ref { id: 7 }]);
799            assert!(
800                history
801                    .get_invocation((i + 2).into())
802                    .unwrap()
803                    .exception()
804                    .is_some()
805            );
806        }
807
808        // Test if you can fill a valid result on an errored invocation 2.
809        history.set_result(
810            2.into(),
811            Ok(Serialized::serialize(&"2".to_string()).unwrap()),
812        );
813        // check that seq 2 is still errored
814        assert!(
815            history
816                .get_invocation((2).into())
817                .unwrap()
818                .exception()
819                .is_some()
820        );
821        assert!(
822            history
823                .get_invocation((2).into())
824                .unwrap()
825                .value()
826                .is_none()
827        );
828    }
829
830    #[test]
831    fn test_option_seq_comparision() {
832        assert_eq!(OptionSeq::from(None), OptionSeq::from(None));
833        assert_eq!(OptionSeq::from(1), OptionSeq::from(Seq::from(1)));
834        assert_eq!(OptionSeq::from(1), OptionSeq::from(Some(Seq::from(1))));
835
836        assert!(OptionSeq::from(None) < OptionSeq::from(0));
837        assert!(OptionSeq::from(0) < OptionSeq::from(1));
838
839        assert!(OptionSeq::from(0) > OptionSeq::from(None));
840        assert!(OptionSeq::from(1) > OptionSeq::from(0));
841    }
842}