1use std::cmp::Ordering;
10use std::collections::BTreeMap;
11use std::collections::HashMap;
12use std::collections::HashSet;
13
14use hyperactor::clock::Clock;
15use hyperactor::data::Serialized;
16use monarch_messages::client::Exception;
17use monarch_messages::controller::Seq;
18use monarch_messages::controller::WorkerError;
19use monarch_messages::worker::Ref;
20
21#[allow(dead_code)]
28#[derive(Debug)]
29struct Invocation {
30 seq: Seq,
33 defs: Vec<Ref>,
36 uses: Vec<Ref>,
38 result: Option<Result<Serialized, Exception>>,
41 users: HashSet<Seq>,
43 reported: bool,
46}
47
48impl Invocation {
49 fn new(seq: Seq, uses: Vec<Ref>, defs: Vec<Ref>) -> Self {
50 Self {
51 seq,
52 uses,
53 defs,
54 result: None,
55 users: HashSet::new(),
56 reported: false,
57 }
58 }
59
60 fn add_user(&mut self, user: Seq) {
61 self.users.insert(user);
62 }
63
64 fn set_result(&mut self, result: Result<Serialized, Exception>) {
67 if self.result.is_none() || matches!((&self.result, &result), (Some(Ok(_)), Err(_))) {
68 self.result = Some(result);
69 }
70 }
71
72 fn set_exception(&mut self, exception: Exception) {
73 match exception {
74 Exception::Error(_, caused_by, error) => {
75 self.set_result(Err(Exception::Error(self.seq, caused_by, error)));
76 }
77 Exception::Failure(_) => {
78 tracing::error!(
79 "system failures {:?} can never be assigned for an invocation",
80 exception
81 );
82 }
83 }
84 }
85
86 fn exception(&self) -> Option<&Exception> {
87 self.result
88 .as_ref()
89 .map(Result::as_ref)
90 .and_then(Result::err)
91 }
92
93 #[allow(dead_code)]
94 fn value(&self) -> Option<&Serialized> {
95 self.result
96 .as_ref()
97 .map(Result::as_ref)
98 .and_then(Result::ok)
99 }
100}
101
102#[derive(Debug, PartialEq)]
103enum RefStatus {
104 Invoked(Seq),
106 Errored(Exception),
108}
109
110#[derive(Debug)]
117#[allow(dead_code)]
118pub struct History {
119 first_incomplete_seqs: MinVector<Seq>,
122 min_incomplete_seq: Seq,
124 invocations: HashMap<Seq, Invocation>,
126 invocation_for_ref: HashMap<Ref, RefStatus>,
129 marked_for_deletion: HashSet<Ref>,
131 max_seq: OptionSeq,
133 first_incomplete_seqs_controller: MinVector<Seq>,
136 min_incompleted_seq_controller: Seq,
138 deadline: Option<(Seq, tokio::time::Instant, bool)>,
146}
147
148#[derive(Debug)]
150struct MinVector<T> {
151 data: Vec<T>,
152 value_counts: BTreeMap<T, usize>,
153}
154
155impl<T> MinVector<T>
156where
157 T: Ord + Copy,
158{
159 fn new(data: Vec<T>) -> Self {
160 let mut value_counts = BTreeMap::new();
161 for &value in &data {
162 *value_counts.entry(value).or_insert(0) += 1;
163 }
164 MinVector { data, value_counts }
165 }
166
167 fn set(&mut self, index: usize, value: T) {
168 let old_value = self.data[index];
170 if let Some(count) = self.value_counts.get_mut(&old_value) {
171 *count -= 1;
172 if *count == 0 {
173 self.value_counts.remove(&old_value);
174 }
175 }
176 self.data[index] = value;
178
179 *self.value_counts.entry(value).or_insert(0) += 1;
181 }
182
183 fn get(&self, index: usize) -> T {
184 self.data[index]
185 }
186
187 fn min(&self) -> T {
188 *self.value_counts.keys().next().unwrap()
189 }
190
191 fn len(&self) -> usize {
192 self.data.len()
193 }
194
195 fn vec(&self) -> &Vec<T> {
196 &self.data
197 }
198}
199
200impl History {
201 pub fn new(world_size: usize) -> Self {
202 Self {
203 first_incomplete_seqs: MinVector::new(vec![Seq::default(); world_size]),
204 min_incomplete_seq: Seq::default(),
205 invocation_for_ref: HashMap::new(),
206 invocations: HashMap::new(),
207 marked_for_deletion: HashSet::new(),
208 max_seq: OptionSeq::from(None),
209 first_incomplete_seqs_controller: MinVector::new(vec![Seq::default(); world_size]),
210 min_incompleted_seq_controller: Seq::default(),
211 deadline: None,
212 }
213 }
214
215 #[cfg(test)]
216 pub fn first_incomplete_seqs(&self) -> &[Seq] {
217 self.first_incomplete_seqs.vec()
218 }
219
220 pub fn first_incomplete_seqs_controller(&self) -> &[Seq] {
221 self.first_incomplete_seqs_controller.vec()
222 }
223
224 pub fn min_incomplete_seq_reported(&self) -> Seq {
225 self.min_incompleted_seq_controller
226 }
227
228 pub fn world_size(&self) -> usize {
229 self.first_incomplete_seqs.len()
230 }
231
232 pub fn delete_invocations_for_refs(&mut self, refs: Vec<Ref>) {
233 self.marked_for_deletion.extend(refs);
234
235 self.marked_for_deletion
236 .retain(|ref_| match self.invocation_for_ref.get(ref_) {
237 Some(RefStatus::Invoked(seq)) => {
238 if seq < &self.min_incomplete_seq {
239 self.invocation_for_ref.remove(ref_);
240 false
241 } else {
242 true
243 }
244 }
245 Some(RefStatus::Errored(_)) => {
246 self.invocation_for_ref.remove(ref_);
247 false
248 }
249 None => true,
250 });
251 }
252
253 pub fn add_invocation(
255 &mut self,
256 seq: Seq,
257 uses: Vec<Ref>,
258 defs: Vec<Ref>,
259 ) -> Vec<(Seq, Option<Result<Serialized, Exception>>)> {
260 let mut results = Vec::new();
261 let input_seq = OptionSeq::from(seq);
262 assert!(
263 input_seq >= self.max_seq,
264 "nonmonotonic seq: {:?}; current max: {:?}",
265 seq,
266 self.max_seq,
267 );
268 self.max_seq = input_seq;
269 let mut invocation = Invocation::new(seq, uses.clone(), defs.clone());
270
271 for use_ in uses {
272 match self.invocation_for_ref.get(&use_) {
274 Some(RefStatus::Errored(exception)) => {
275 if !invocation.reported {
278 invocation.set_exception(exception.clone());
279 results.push((seq, Some(Err(exception.clone()))));
280 invocation.reported = true;
281 }
282 }
283 Some(RefStatus::Invoked(invoked_seq)) => {
284 if let Some(invocation) = self.invocations.get_mut(invoked_seq) {
285 invocation.add_user(seq)
286 }
287 }
288 None => tracing::debug!(
289 "ignoring dependency on potentially complete invocation for ref: {:?}",
290 use_
291 ),
292 }
293 }
294 for def in defs {
295 self.invocation_for_ref.insert(
296 def,
297 match invocation.exception() {
298 Some(err) => RefStatus::Errored(err.clone()),
299 None => RefStatus::Invoked(seq.clone()),
300 },
301 );
302 }
303
304 self.invocations.insert(seq, invocation);
305
306 results
307 }
308
309 pub fn propagate_exception(&mut self, seq: Seq, exception: Exception) {
312 let mut queue = vec![seq];
313 let mut visited = HashSet::new();
314
315 while let Some(seq) = queue.pop() {
316 if !visited.insert(seq) {
317 continue;
318 }
319
320 let Some(invocation) = self.invocations.get_mut(&seq) else {
321 continue;
322 };
323
324 for def in invocation.defs.iter() {
327 match self.invocation_for_ref.get(def) {
328 Some(RefStatus::Invoked(invoked_seq)) if *invoked_seq == seq => self
329 .invocation_for_ref
330 .insert(*def, RefStatus::Errored(exception.clone())),
331 _ => None,
332 };
333 }
334 invocation.set_exception(exception.clone());
335 queue.extend(invocation.users.iter());
336 }
337 }
338
339 fn find_unreported_dependent_exceptions(
340 &mut self,
341 seq: Seq,
342 ) -> Vec<(Seq, Option<Result<Serialized, Exception>>)> {
343 let mut queue = vec![seq];
344 let mut visited = HashSet::new();
345 let mut results = Vec::new();
346
347 while let Some(seq) = queue.pop() {
348 if !visited.insert(seq) {
349 continue;
350 }
351
352 let Some(invocation) = self.invocations.get_mut(&seq) else {
353 continue;
354 };
355
356 if !matches!(invocation.result, Some(Err(_))) || invocation.reported {
357 continue;
358 }
359
360 invocation.reported = true;
361
362 results.push((seq, invocation.result.clone()));
363
364 queue.extend(invocation.users.iter());
365 }
366 results
367 }
368
369 pub fn report_deadline_missed(&mut self) {
370 if let Some((seq, time, _)) = self.deadline {
371 self.deadline = Some((seq, time, true));
372 }
373 }
374
375 pub fn deadline(
376 &mut self,
377 expected_progress: u64,
378 timeout: tokio::time::Duration,
379 clock: &impl Clock,
380 ) -> Option<(Seq, tokio::time::Instant, bool)> {
381 let previous_deadline_completed = match self.deadline {
382 Some((expected_seq, ..)) => self.min_incompleted_seq_controller > expected_seq,
383 None => self.max_seq.inner().is_some(),
384 };
385
386 if previous_deadline_completed {
387 let next_expected_completed_seq = std::cmp::min(
388 OptionSeq::from(u64::from(self.min_incompleted_seq_controller) + expected_progress),
389 self.max_seq.clone(),
390 );
391
392 self.deadline =
393 next_expected_completed_seq
394 .into_inner()
395 .map(|next_expected_completed_seq| {
396 (next_expected_completed_seq, clock.now() + timeout, false)
397 });
398 }
399 self.deadline
400 }
401
402 pub fn update_deadline_tracking(&mut self, rank: usize, seq: Seq) {
403 self.first_incomplete_seqs_controller.set(
407 rank,
408 std::cmp::max(seq, self.first_incomplete_seqs.get(rank)),
409 );
410
411 self.min_incompleted_seq_controller = self.first_incomplete_seqs_controller.min();
412 }
413
414 pub fn rank_completed(
417 &mut self,
418 rank: usize,
419 seq: Seq,
420 ) -> Vec<(Seq, Option<Result<Serialized, Exception>>)> {
421 self.first_incomplete_seqs.set(rank, seq);
422 let prev = self.min_incomplete_seq;
423 self.min_incomplete_seq = self.first_incomplete_seqs.min();
424 self.update_deadline_tracking(rank, seq);
425
426 let mut results: Vec<(Seq, Option<Result<Serialized, Exception>>)> = Vec::new();
427 for i in Seq::iter_between(prev, self.min_incomplete_seq) {
428 if let Some(invocation) = self.invocations.remove(&i) {
429 let retain = if let Some(result) = invocation.result {
430 let is_err = result.is_err();
431 if !invocation.reported {
432 results.push((i, Some(result)));
433 }
434 is_err
435 } else {
436 results.push((i, None));
438 false
439 };
440
441 if retain {
442 for def in &invocation.defs {
446 match self.invocation_for_ref.get(def) {
447 Some(RefStatus::Invoked(seq)) if *seq == i => {
448 self.invocation_for_ref.remove(def)
449 }
450 _ => None,
451 };
452 }
453 }
454 }
455 }
456
457 results.extend(self.find_unreported_dependent_exceptions(seq));
460
461 results
462 }
463
464 #[cfg(test)]
465 fn get_invocation(&self, seq: Seq) -> Option<&Invocation> {
466 self.invocations.get(&seq)
467 }
468
469 pub fn set_result(&mut self, seq: Seq, result: Result<Serialized, WorkerError>) {
470 if let Some(invocation) = self.invocations.get_mut(&seq) {
471 invocation.set_result(result.map_err(|e| Exception::Error(seq, seq, e)));
472 }
473 }
474}
475
476#[derive(Clone, Debug, PartialEq, Eq)]
480pub(crate) struct OptionSeq(Option<Seq>);
481
482impl OptionSeq {
483 pub fn inner(&self) -> &Option<Seq> {
485 &self.0
486 }
487
488 pub fn into_inner(self) -> Option<Seq> {
490 self.0
491 }
492}
493
494impl PartialOrd for OptionSeq {
495 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
496 Some(self.cmp(other))
497 }
498}
499
500impl Ord for OptionSeq {
501 fn cmp(&self, other: &Self) -> Ordering {
502 match (self.0, other.0) {
503 (Some(a), Some(b)) => a.cmp(&b),
504 (Some(_), None) => Ordering::Greater,
505 (None, Some(_)) => Ordering::Less,
506 (None, None) => Ordering::Equal,
507 }
508 }
509}
510
511impl From<u64> for OptionSeq {
512 fn from(value: u64) -> Self {
513 OptionSeq(Some(Seq::from(value)))
514 }
515}
516
517impl From<Seq> for OptionSeq {
518 fn from(seq: Seq) -> Self {
519 OptionSeq(Some(seq))
520 }
521}
522
523impl From<Option<Seq>> for OptionSeq {
524 fn from(value: Option<Seq>) -> Self {
525 OptionSeq(value)
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use std::assert_matches::assert_matches;
532
533 use hyperactor::id;
534
535 use super::*;
536
537 struct InvocationUsersIterator<'a> {
538 history: &'a History,
539 stack: Vec<&'a Invocation>,
540 visited: HashSet<Seq>,
541 }
542
543 impl<'a> Iterator for InvocationUsersIterator<'a> {
544 type Item = &'a Invocation;
545
546 fn next(&mut self) -> Option<Self::Item> {
547 while let Some(invocation) = self.stack.pop() {
548 if !self.visited.insert(invocation.seq) {
549 continue;
550 }
551 self.stack.extend(
552 invocation
553 .users
554 .iter()
555 .filter_map(|seq| self.history.invocations.get(seq)),
556 );
557 return Some(invocation);
558 }
559 None
560 }
561 }
562
563 impl History {
564 pub(crate) fn iter_users_transitive(&self, seq: Seq) -> impl Iterator<Item = Seq> + '_ {
570 let invocations = self
571 .invocations
572 .get(&seq)
573 .map_or(Vec::default(), |invocation| vec![invocation]);
574
575 InvocationUsersIterator {
576 history: self,
577 stack: invocations,
578 visited: HashSet::new(),
579 }
580 .map(|invocation| invocation.seq)
581 }
582 }
583
584 #[test]
585 fn simple_history() {
586 let mut history = History::new(2);
587 history.add_invocation(0.into(), vec![], vec![Ref { id: 1 }, Ref { id: 2 }]);
588 history.add_invocation(1.into(), vec![Ref { id: 1 }], vec![Ref { id: 3 }]);
589 history.add_invocation(2.into(), vec![Ref { id: 3 }], vec![Ref { id: 4 }]);
590 history.add_invocation(3.into(), vec![Ref { id: 3 }], vec![Ref { id: 5 }]);
591 history.add_invocation(4.into(), vec![Ref { id: 3 }], vec![Ref { id: 6 }]);
592 history.add_invocation(5.into(), vec![Ref { id: 4 }], vec![Ref { id: 7 }]);
593 history.add_invocation(6.into(), vec![Ref { id: 4 }], vec![Ref { id: 8 }]);
594
595 let mut res = history
596 .iter_users_transitive(1.into())
597 .collect::<Vec<Seq>>();
598 res.sort();
599 assert_eq!(
600 res,
601 vec![1.into(), 2.into(), 3.into(), 4.into(), 5.into(), 6.into()]
602 );
603
604 history.rank_completed(0, 2.into());
605 let mut res = history
606 .iter_users_transitive(1.into())
607 .collect::<Vec<Seq>>();
608 res.sort();
609 assert_eq!(
610 res,
611 vec![1.into(), 2.into(), 3.into(), 4.into(), 5.into(), 6.into()]
612 );
613
614 history.rank_completed(1, 2.into());
615 let res = history
616 .iter_users_transitive(1.into())
617 .collect::<Vec<Seq>>();
618 assert_eq!(res, vec![]);
619
620 history.add_invocation(7.into(), vec![Ref { id: 1 }], vec![]);
622 }
623
624 #[test]
625 fn delete_errored_invocations() {
626 let mut history = History::new(1);
627 history.add_invocation(0.into(), vec![], vec![Ref { id: 1 }, Ref { id: 2 }]);
628 history.add_invocation(1.into(), vec![Ref { id: 1 }], vec![Ref { id: 3 }]);
629 history.propagate_exception(
630 0.into(),
631 Exception::Error(
632 0.into(),
633 0.into(),
634 WorkerError {
635 backtrace: "worker error happened".to_string(),
636 worker_actor_id: id!(test[234].testactor[6]),
637 },
638 ),
639 );
640 history.delete_invocations_for_refs(vec![Ref { id: 1 }, Ref { id: 2 }]);
641 history.rank_completed(0, 1.into());
642 assert_eq!(history.invocation_for_ref.len(), 1);
643 history.delete_invocations_for_refs(vec![Ref { id: 3 }]);
644 history.rank_completed(0, 2.into());
645 assert!(history.invocation_for_ref.is_empty());
646 }
647
648 #[test]
649 fn redefinitions() {
650 let mut history = History::new(2);
651 history.add_invocation(0.into(), vec![], vec![Ref { id: 1 }, Ref { id: 2 }]);
652 history.add_invocation(1.into(), vec![Ref { id: 1 }], vec![Ref { id: 3 }]);
653 history.add_invocation(2.into(), vec![Ref { id: 3 }], vec![Ref { id: 4 }]);
654
655 let mut res = history
656 .iter_users_transitive(1.into())
657 .collect::<Vec<Seq>>();
658 res.sort();
659 assert_eq!(res, vec![1.into(), 2.into()]);
660
661 history.add_invocation(3.into(), vec![Ref { id: 3 }], vec![Ref { id: 3 }]);
662 history.add_invocation(4.into(), vec![Ref { id: 3 }], vec![Ref { id: 6 }]);
663 history.add_invocation(5.into(), vec![Ref { id: 4 }], vec![Ref { id: 7 }]);
664 history.add_invocation(6.into(), vec![Ref { id: 4 }], vec![Ref { id: 8 }]);
665
666 history.rank_completed(0, 2.into());
667 history.rank_completed(1, 2.into());
668
669 let res = history
670 .iter_users_transitive(3.into())
671 .collect::<Vec<Seq>>();
672 assert_eq!(res, vec![3.into(), 4.into()]);
673 }
674
675 #[test]
676 fn min_vector() {
677 let data = vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5];
679 let mut min_vector = MinVector::new(data.clone());
680
681 assert_eq!(min_vector.len(), data.len());
683
684 assert_eq!(min_vector.vec(), &data);
686
687 assert_eq!(min_vector.min(), 1);
689
690 for (i, &value) in data.iter().enumerate() {
692 assert_eq!(min_vector.get(i), value);
693 }
694
695 min_vector.set(0, 0); assert_eq!(min_vector.get(0), 0);
698 assert_eq!(min_vector.min(), 0);
699 min_vector.set(1, 7); assert_eq!(min_vector.get(1), 7);
701 assert_eq!(min_vector.min(), 0);
702 min_vector.set(0, 8); assert_eq!(min_vector.get(0), 8);
704 assert_eq!(min_vector.min(), 1); min_vector.set(2, 5); assert_eq!(min_vector.get(2), 5);
709 assert_eq!(min_vector.min(), 1);
710
711 min_vector.set(3, 0); assert_eq!(min_vector.get(3), 0);
714 assert_eq!(min_vector.min(), 0);
715
716 for i in 0..min_vector.len() {
718 min_vector.set(i, 10);
719 }
720 assert_eq!(min_vector.min(), 10);
721 assert_eq!(min_vector.vec(), &vec![10; min_vector.len()]);
722 }
723
724 #[test]
725 fn failure_propagation() {
726 let mut history = History::new(2);
727
728 history.add_invocation(0.into(), vec![], vec![Ref { id: 1 }, Ref { id: 2 }]);
729 history.add_invocation(1.into(), vec![Ref { id: 1 }], vec![Ref { id: 3 }]);
730 history.add_invocation(
731 2.into(),
732 vec![Ref { id: 3 }],
733 vec![Ref { id: 4 }, Ref { id: 5 }],
734 );
735 history.add_invocation(3.into(), vec![Ref { id: 2 }], vec![Ref { id: 6 }]);
736 history.add_invocation(4.into(), vec![Ref { id: 5 }], vec![Ref { id: 6 }]);
737
738 for i in 1..=3 {
740 assert!(
741 history
742 .get_invocation(i.into())
743 .unwrap()
744 .exception()
745 .is_none()
746 );
747 }
748
749 history.propagate_exception(
752 1.into(),
753 Exception::Error(
754 1.into(),
755 1.into(),
756 WorkerError {
757 backtrace: "worker error happened".to_string(),
758 worker_actor_id: "test[234].testactor[6]".parse().unwrap(),
759 },
760 ),
761 );
762
763 for i in [1, 2, 4] {
765 assert!(
766 history
767 .get_invocation(i.into())
768 .unwrap()
769 .exception()
770 .is_some()
771 );
772 }
773
774 for i in [0, 3] {
776 assert!(
777 history
778 .get_invocation(i.into())
779 .unwrap()
780 .exception()
781 .is_none()
782 );
783 }
784
785 history.rank_completed(0, 2.into());
790 history.rank_completed(1, 2.into());
791
792 for i in [3, 4, 5, 6] {
793 assert_matches!(
794 history.invocation_for_ref.get(&i.into()),
795 Some(RefStatus::Errored(_)),
796 );
797 history.add_invocation((i + 2).into(), vec![Ref { id: i }], vec![Ref { id: 7 }]);
799 assert!(
800 history
801 .get_invocation((i + 2).into())
802 .unwrap()
803 .exception()
804 .is_some()
805 );
806 }
807
808 history.set_result(
810 2.into(),
811 Ok(Serialized::serialize(&"2".to_string()).unwrap()),
812 );
813 assert!(
815 history
816 .get_invocation((2).into())
817 .unwrap()
818 .exception()
819 .is_some()
820 );
821 assert!(
822 history
823 .get_invocation((2).into())
824 .unwrap()
825 .value()
826 .is_none()
827 );
828 }
829
830 #[test]
831 fn test_option_seq_comparision() {
832 assert_eq!(OptionSeq::from(None), OptionSeq::from(None));
833 assert_eq!(OptionSeq::from(1), OptionSeq::from(Seq::from(1)));
834 assert_eq!(OptionSeq::from(1), OptionSeq::from(Some(Seq::from(1))));
835
836 assert!(OptionSeq::from(None) < OptionSeq::from(0));
837 assert!(OptionSeq::from(0) < OptionSeq::from(1));
838
839 assert!(OptionSeq::from(0) > OptionSeq::from(None));
840 assert!(OptionSeq::from(1) > OptionSeq::from(0));
841 }
842}