1use std::collections::HashMap;
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::time::SystemTime;
13
14use async_trait::async_trait;
15use hyperactor::Actor;
16use hyperactor::Bind;
17use hyperactor::Context;
18use hyperactor::Endpoint as _;
19use hyperactor::Handler;
20use hyperactor::Instance;
21use hyperactor::RemoteEndpoint as _;
22use hyperactor::Unbind;
23use hyperactor::actor::ActorError;
24use hyperactor::actor::ActorErrorKind;
25use hyperactor::actor::ActorStatus;
26use hyperactor::actor::Referable;
27use hyperactor::actor::handle_undeliverable_message;
28use hyperactor::context;
29use hyperactor::kv_pairs;
30use hyperactor::mailbox::MessageEnvelope;
31use hyperactor::mailbox::RemoteMessage;
32use hyperactor::mailbox::Undeliverable;
33use hyperactor::supervision::ActorSupervisionEvent;
34use hyperactor_config::CONFIG;
35use hyperactor_config::ConfigAttr;
36use hyperactor_config::Flattrs;
37use hyperactor_config::attrs::declare_attrs;
38use hyperactor_telemetry::declare_static_counter;
39use ndslice::ViewExt;
40use ndslice::view::CollectMeshExt;
41use ndslice::view::Point;
42use ndslice::view::Ranked;
43use opentelemetry::metrics::Counter;
44use serde::Deserialize;
45use serde::Serialize;
46use tokio::time::Duration;
47use typeuri::Named;
48
49use crate::ValueMesh;
50use crate::actor_mesh::ActorMeshRef;
51use crate::bootstrap::ProcStatus;
52use crate::casting::CAST_ACTOR_MESH_ID;
53use crate::casting::update_undeliverable_envelope_for_casting;
54use crate::mesh_id::ResourceId;
55use crate::proc_agent::ActorState;
56use crate::proc_agent::MESH_ORPHAN_TIMEOUT;
57use crate::proc_mesh::ProcMeshRef;
58use crate::resource;
59use crate::supervision::MeshFailure;
60use crate::supervision::Unhealthy;
61
62pub const ACTOR_MESH_CONTROLLER_NAME: &str = "actor_mesh_controller";
64
65declare_attrs! {
66 @meta(CONFIG = ConfigAttr::new(
74 Some("HYPERACTOR_MESH_SUPERVISION_POLL_FREQUENCY".to_string()),
75 None,
76 ))
77 pub attr SUPERVISION_POLL_FREQUENCY: Duration = Duration::from_secs(10);
78}
79
80declare_static_counter!(
81 ACTOR_MESH_CONTROLLER_SUPERVISION_STALLS,
82 "actor.actor_mesh_controller.num_stalls"
83);
84
85declare_static_counter!(
86 PROC_MESH_CONTROLLER_SUPERVISION_STALLS,
87 "actor.proc_mesh_controller.num_stalls"
88);
89
90#[derive(Debug)]
98pub struct HealthState {
99 statuses: HashMap<Point, (resource::Status, u64)>,
103 unhealthy_event: Option<Unhealthy>,
106 crashed_ranks: HashMap<usize, ActorSupervisionEvent>,
109 owner: Option<hyperactor::PortRef<MeshFailure>>,
112 subscribers: HashSet<hyperactor::PortRef<Option<MeshFailure>>>,
115}
116
117impl HealthState {
118 fn new(
119 statuses: HashMap<Point, resource::Status>,
120 owner: Option<hyperactor::PortRef<MeshFailure>>,
121 ) -> Self {
122 Self {
123 statuses: statuses
124 .into_iter()
125 .map(|(point, status)| (point, (status, 0)))
126 .collect(),
127 unhealthy_event: None,
128 crashed_ranks: HashMap::new(),
129 owner,
130 subscribers: HashSet::new(),
131 }
132 }
133
134 fn maybe_update(&mut self, point: Point, status: resource::Status, generation: u64) -> bool {
138 use std::collections::hash_map::Entry;
139 match self.statuses.entry(point) {
140 Entry::Occupied(mut entry) => {
141 let (old_status, old_gen) = entry.get();
142 if old_status.is_terminating() || *old_gen > generation {
145 return false;
146 }
147 let changed = *old_status != status;
148 *entry.get_mut() = (status, generation);
149 changed
150 }
151 Entry::Vacant(entry) => {
152 entry.insert((status, generation));
153 true
154 }
155 }
156 }
157
158 fn all_terminating(&self) -> bool {
160 self.statuses.values().all(|(s, _)| s.is_terminating())
161 }
162
163 fn any_terminating(&self) -> bool {
165 self.statuses.values().any(|(s, _)| s.is_terminating())
166 }
167
168 pub(crate) fn apply_updates_and_notify<S: Clone + 'static>(
174 &mut self,
175 states: &ValueMesh<resource::State<S>>,
176 mut on_change: impl FnMut(resource::State<S>, &mut HealthState) -> bool,
177 ) -> bool {
178 let mut did_notify = false;
179 for (point, state) in states.iter() {
180 let status = state.status.clone();
181 let generation = state.generation;
182 if self.maybe_update(point, status, generation) && on_change(state, self) {
183 did_notify = true;
184 }
185 }
186 did_notify
187 }
188}
189
190pub enum PollResult {
192 Reschedule,
194 Processed { did_notify: bool },
197}
198
199fn compute_keepalive() -> Option<SystemTime> {
202 hyperactor_config::global::get(MESH_ORPHAN_TIMEOUT).map(|d| SystemTime::now() + d)
203}
204
205#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
211pub struct Subscribe(pub hyperactor::PortRef<Option<MeshFailure>>);
212wirevalue::register_type!(Subscribe);
213
214#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
217pub struct Unsubscribe(pub hyperactor::PortRef<Option<MeshFailure>>);
218wirevalue::register_type!(Unsubscribe);
219
220#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
222pub struct GetSubscriberCount(#[binding(include)] pub hyperactor::PortRef<usize>);
223wirevalue::register_type!(GetSubscriberCount);
224
225#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
230pub struct CheckState(pub SystemTime);
231wirevalue::register_type!(CheckState);
232
233declare_attrs! {
234 pub attr ACTOR_MESH_SUBSCRIBER_MESSAGE: bool;
237}
238
239fn send_subscriber_message(
240 cx: &impl context::Actor,
241 subscriber: &hyperactor::PortRef<Option<MeshFailure>>,
242 message: MeshFailure,
243) {
244 let mut headers = Flattrs::new();
245 headers.set(ACTOR_MESH_SUBSCRIBER_MESSAGE, true);
246 subscriber.post_with_headers(cx, headers, Some(message.clone()));
247 tracing::info!(event = %message, "sent supervision failure message to subscriber {}", subscriber.port_addr());
248}
249
250fn send_heartbeat(cx: &impl context::Actor, health_state: &HealthState) {
257 tracing::debug!(
258 num_subscribers = health_state.subscribers.len(),
259 "sending heartbeat to subscribers",
260 );
261
262 for subscriber in health_state.subscribers.iter() {
263 let mut headers = Flattrs::new();
264 headers.set(ACTOR_MESH_SUBSCRIBER_MESSAGE, true);
265 subscriber.post_with_headers(cx, headers, None);
266 }
267}
268
269fn send_state_change(
274 cx: &impl context::Actor,
275 rank: usize,
276 event: ActorSupervisionEvent,
277 mesh_name: &ResourceId,
278 is_proc_stopped: bool,
279 health_state: &mut HealthState,
280) {
281 let is_failed = event.is_error();
284 if is_failed {
285 tracing::warn!(
286 name = "SupervisionEvent",
287 actor_mesh = %mesh_name,
288 %event,
289 "detected supervision error on monitored mesh: name={mesh_name}",
290 );
291 } else {
292 tracing::debug!(
293 name = "SupervisionEvent",
294 actor_mesh = %mesh_name,
295 %event,
296 "detected non-error supervision event on monitored mesh: name={mesh_name}",
297 );
298 }
299
300 let failure_message = MeshFailure {
301 actor_mesh_name: Some(mesh_name.to_string()),
302 event: event.clone(),
303 crashed_ranks: vec![rank],
304 };
305 health_state.crashed_ranks.insert(rank, event.clone());
306 health_state.unhealthy_event = Some(if is_proc_stopped {
307 Unhealthy::StreamClosed(failure_message.clone())
308 } else {
309 Unhealthy::Crashed(failure_message.clone())
310 });
311 if is_failed && let Some(owner) = &health_state.owner {
316 owner.post(cx, failure_message.clone());
317 tracing::info!(actor_mesh = %mesh_name, %event, "sent supervision failure message to owner {}", owner.port_addr());
318 }
319 for subscriber in health_state.subscribers.iter() {
322 send_subscriber_message(cx, subscriber, failure_message.clone());
323 }
324}
325
326fn actor_state_to_supervision_events(
327 state: resource::State<ActorState>,
328) -> (usize, Vec<ActorSupervisionEvent>) {
329 let (rank, actor_id, events) = match state.state {
330 Some(inner) => (
331 inner.create_rank,
332 Some(inner.actor_id),
333 inner.supervision_events.clone(),
334 ),
335 None => (0, None, vec![]),
336 };
337 let events = match state.status {
338 resource::Status::NotExist | resource::Status::Stopped | resource::Status::Timeout(_) => {
341 if !events.is_empty() {
343 events
344 } else {
345 vec![ActorSupervisionEvent::new(
346 actor_id.expect("actor_id is None"),
347 None,
348 ActorStatus::Stopped(
349 format!(
350 "actor status is {}; actor may have been killed",
351 state.status
352 )
353 .to_string(),
354 ),
355 None,
356 )]
357 }
358 }
359 resource::Status::Failed(_) => events,
360 _ => vec![],
362 };
363 (rank, events)
364}
365
366fn proc_status_to_actor_status(proc_status: Option<ProcStatus>) -> ActorStatus {
375 match proc_status {
376 Some(ProcStatus::Stopped { exit_code: 0, .. }) => {
377 ActorStatus::Stopped("process exited cleanly".to_string())
378 }
379 Some(ProcStatus::Stopped { exit_code, .. }) => {
380 ActorStatus::Failed(ActorErrorKind::Generic(format!(
381 "the process this actor was running on exited with non-zero code {}",
382 exit_code
383 )))
384 }
385 Some(ProcStatus::Stopping { .. }) => {
388 ActorStatus::Stopped("process is stopping".to_string())
389 }
390 None => ActorStatus::Stopped("no status received from process".to_string()),
392 Some(status) => ActorStatus::Failed(ActorErrorKind::Generic(format!(
393 "the process this actor was running on failed: {}",
394 status
395 ))),
396 }
397}
398
399fn format_system_time(time: SystemTime) -> String {
400 let datetime: chrono::DateTime<chrono::Local> = time.into();
401 datetime.to_rfc3339()
402}
403
404fn check_stall(expected_time: SystemTime, actor_id: &hyperactor::ActorId, counter: &Counter<u64>) {
409 if SystemTime::now()
410 <= expected_time + hyperactor_config::global::get(SUPERVISION_POLL_FREQUENCY)
411 {
412 return;
413 }
414 let expected_time = format_system_time(expected_time);
415 counter.add(
416 1,
417 kv_pairs!("actor_id" => actor_id.to_string(), "expected_time" => expected_time.clone()),
418 );
419 tracing::warn!(
420 %actor_id,
421 "Handler<CheckState> is stalled, expected at {}",
422 expected_time,
423 );
424}
425
426#[async_trait]
434pub trait Controlled: Clone + Debug + Send + Sync + 'static {
435 type StateInner: RemoteMessage + Clone + Debug + 'static;
437
438 fn stall_counter() -> &'static Counter<u64>;
440
441 fn id(&self) -> &ResourceId;
443
444 fn region(&self) -> &ndslice::Region;
446
447 fn subscribe_to_stream(
450 &self,
451 cx: &impl context::Actor,
452 subscriber: hyperactor::PortRef<resource::State<Self::StateInner>>,
453 ) -> anyhow::Result<()>;
454
455 fn forward_wait_rank_status(
457 &self,
458 cx: &impl context::Actor,
459 msg: resource::WaitRankStatus,
460 ) -> anyhow::Result<()>;
461
462 async fn poll_states(
468 &self,
469 cx: &impl context::Actor,
470 supervision_display_name: &str,
471 health_state: &mut HealthState,
472 ) -> PollResult;
473
474 fn process_state(
478 &self,
479 cx: &impl context::Actor,
480 state: resource::State<Self::StateInner>,
481 health_state: &mut HealthState,
482 ) -> bool;
483
484 async fn handle_stop_request(
488 &self,
489 cx: &impl context::Actor,
490 supervision_display_name: &str,
491 reason: String,
492 health_state: &mut HealthState,
493 ) -> anyhow::Result<()>;
494
495 async fn cleanup_stop(&self, cx: &impl context::Actor, reason: String) -> anyhow::Result<()>;
498}
499
500#[hyperactor::export(
514 handlers=[
515 Subscribe,
516 Unsubscribe,
517 GetSubscriberCount,
518 CheckState,
519 resource::WaitRankStatus,
520 resource::CreateOrUpdate<resource::mesh::Spec<()>>,
521 resource::GetState<resource::mesh::State<()>>,
522 resource::Stop,
523 resource::State<T::StateInner>,
524 ]
525)]
526pub struct ResourceController<T: Controlled> {
527 mesh: T,
528 supervision_display_name: Option<String>,
531 health_state: HealthState,
533 monitor: Option<()>,
536}
537
538pub type ActorMeshController<A> = ResourceController<ActorMeshRef<A>>;
540
541impl<T: Controlled> ResourceController<T> {
542 pub(crate) fn new(
544 mesh: T,
545 supervision_display_name: Option<String>,
546 owner: Option<hyperactor::PortRef<MeshFailure>>,
547 initial_statuses: ValueMesh<resource::Status>,
548 ) -> Self {
549 Self {
550 mesh,
551 supervision_display_name,
552 health_state: HealthState::new(initial_statuses.iter().collect(), owner),
553 monitor: None,
554 }
555 }
556
557 pub(crate) fn supervision_display_name(&self) -> String {
559 self.supervision_display_name
560 .clone()
561 .unwrap_or_else(|| self.mesh.id().to_string())
562 }
563
564 fn schedule_next_check(&self, send_fn: impl FnOnce(CheckState, Duration)) {
570 if self.monitor.is_some() {
571 let delay = hyperactor_config::global::get(SUPERVISION_POLL_FREQUENCY);
572 send_fn(CheckState(SystemTime::now() + delay), delay);
573 }
574 }
575
576 fn mesh_status(&self) -> resource::Status {
578 if let Some(Unhealthy::Crashed(e)) = &self.health_state.unhealthy_event {
579 resource::Status::Failed(e.to_string())
580 } else if let Some(Unhealthy::StreamClosed(_)) = &self.health_state.unhealthy_event {
581 resource::Status::Stopped
582 } else if self.monitor.is_none() {
583 resource::Status::Stopped
584 } else {
585 resource::Status::Running
586 }
587 }
588
589 fn handle_get_state_msg(
591 &self,
592 cx: &impl context::Actor,
593 message: resource::GetState<resource::mesh::State<()>>,
594 ) -> anyhow::Result<()> {
595 let status = self.mesh_status();
596 let mut statuses = self
597 .health_state
598 .statuses
599 .iter()
600 .map(|(p, (s, _))| (p.clone(), s.clone()))
601 .collect::<Vec<_>>();
602 statuses.sort_by_key(|(p, _)| p.rank());
603 let statuses: ValueMesh<resource::Status> =
604 statuses
605 .into_iter()
606 .map(|(_, s)| s)
607 .collect_mesh::<ValueMesh<_>>(self.mesh.region().clone())?;
608 let state = resource::mesh::State {
609 statuses,
610 state: (),
611 };
612 message.reply.post(
613 cx,
614 resource::State {
615 id: message.id,
616 status,
617 state: Some(state),
618 generation: 0,
619 timestamp: SystemTime::now(),
620 },
621 );
622 Ok(())
623 }
624
625 fn stop_if_all_terminating(&mut self) {
627 if self.health_state.all_terminating() {
628 self.monitor.take();
629 }
630 }
631
632 async fn handle_check_state(
633 &mut self,
634 cx: &Context<'_, Self>,
635 expected_time: SystemTime,
636 ) -> anyhow::Result<()>
637 where
638 resource::State<T::StateInner>: RemoteMessage,
639 {
640 if self.monitor.is_none() {
641 return Ok(());
642 }
643 check_stall(expected_time, cx.self_addr().id(), T::stall_counter());
644
645 let display = self.supervision_display_name();
646 let result = self
647 .mesh
648 .poll_states(cx, &display, &mut self.health_state)
649 .await;
650
651 match result {
652 PollResult::Reschedule => {
653 self.schedule_next_check(|msg, delay| cx.post_after(cx, msg, delay));
654 }
655 PollResult::Processed { did_notify } => {
656 if !did_notify && !self.health_state.any_terminating() {
660 send_heartbeat(cx, &self.health_state);
661 }
662 if !self.health_state.all_terminating() {
663 self.schedule_next_check(|msg, delay| cx.post_after(cx, msg, delay));
664 } else {
665 self.monitor.take();
666 }
667 }
668 }
669 Ok(())
670 }
671}
672
673impl<T: Controlled> Debug for ResourceController<T> {
674 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675 f.debug_struct("ResourceController")
676 .field("mesh", &self.mesh)
677 .field("health_state", &self.health_state)
678 .field("monitor", &self.monitor)
679 .finish()
680 }
681}
682
683impl<T: Controlled> resource::mesh::Mesh for ResourceController<T> {
684 type Spec = ();
685 type State = ();
686}
687
688#[async_trait]
689impl<T: Controlled> Actor for ResourceController<T>
690where
691 resource::State<T::StateInner>: RemoteMessage,
692{
693 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
694 this.set_system();
695
696 let subscriber =
714 hyperactor::PortRef::<resource::State<T::StateInner>>::attest_handler_port(
715 &this.self_addr().clone(),
716 )
717 .unsplit();
718 self.mesh.subscribe_to_stream(this, subscriber)?;
719
720 self.monitor = Some(());
722 self.schedule_next_check(|msg, delay| this.post_after(this, msg, delay));
723
724 let owner = if let Some(owner) = &self.health_state.owner {
725 owner.to_string()
726 } else {
727 String::from("None")
728 };
729 tracing::info!(
730 actor_id = %this.self_addr(),
731 %owner,
732 "started resource controller for {}",
733 self.mesh.id()
734 );
735 Ok(())
736 }
737
738 async fn cleanup(
739 &mut self,
740 this: &Instance<Self>,
741 _err: Option<&ActorError>,
742 ) -> Result<(), anyhow::Error> {
743 if self.monitor.take().is_some() {
744 tracing::info!(
745 actor_id = %this.self_addr(),
746 mesh = %self.mesh.id(),
747 "starting cleanup for ResourceController, stopping mesh",
748 );
749 self.mesh
750 .cleanup_stop(this, "resource controller cleanup".to_string())
751 .await?;
752 }
753 Ok(())
754 }
755
756 async fn handle_undeliverable_message(
757 &mut self,
758 cx: &Instance<Self>,
759 mut envelope: Undeliverable<MessageEnvelope>,
760 ) -> Result<(), anyhow::Error> {
761 envelope = update_undeliverable_envelope_for_casting(envelope);
762 let Some(returned) = envelope.as_message() else {
763 return handle_undeliverable_message(cx, envelope);
764 };
765 if let Some(true) = returned.headers().get(ACTOR_MESH_SUBSCRIBER_MESSAGE) {
766 let dest_port_id = returned.dest().clone();
771 let port = hyperactor::PortRef::<Option<MeshFailure>>::attest(dest_port_id);
772 let did_exist = self.health_state.subscribers.remove(&port);
773 if did_exist {
774 tracing::debug!(
775 actor_id = %cx.self_addr(),
776 num_subscribers = self.health_state.subscribers.len(),
777 "ResourceController: removed subscriber {} from mesh controller",
778 port.port_addr()
779 );
780 }
781 Ok(())
782 } else if returned.headers().get(CAST_ACTOR_MESH_ID).is_some() {
783 tracing::warn!(
788 actor_id = %cx.self_addr(),
789 dest = %returned.dest(),
790 "ResourceController: ignoring undeliverable cast message",
791 );
792 Ok(())
793 } else {
794 handle_undeliverable_message(cx, envelope)
795 }
796 }
797}
798
799#[async_trait]
800impl<T: Controlled> Handler<Subscribe> for ResourceController<T>
801where
802 resource::State<T::StateInner>: RemoteMessage,
803{
804 async fn handle(&mut self, cx: &Context<Self>, message: Subscribe) -> anyhow::Result<()> {
805 if let Some(unhealthy) = &self.health_state.unhealthy_event {
812 let msg = match unhealthy {
813 Unhealthy::StreamClosed(msg) | Unhealthy::Crashed(msg) => msg,
814 };
815 let mut replay_msg = msg.clone();
816 replay_msg.crashed_ranks = self.health_state.crashed_ranks.keys().copied().collect();
817 send_subscriber_message(cx, &message.0, replay_msg);
818 }
819 let port_id = message.0.port_addr().clone();
820 if self.health_state.subscribers.insert(message.0) {
821 tracing::debug!(
822 actor_id = %cx.self_addr(),
823 num_subscribers = self.health_state.subscribers.len(),
824 "added subscriber {} to mesh controller",
825 port_id
826 );
827 }
828 Ok(())
829 }
830}
831
832#[async_trait]
833impl<T: Controlled> Handler<Unsubscribe> for ResourceController<T>
834where
835 resource::State<T::StateInner>: RemoteMessage,
836{
837 async fn handle(&mut self, cx: &Context<Self>, message: Unsubscribe) -> anyhow::Result<()> {
838 if self.health_state.subscribers.remove(&message.0) {
839 tracing::debug!(
840 actor_id = %cx.self_addr(),
841 num_subscribers = self.health_state.subscribers.len(),
842 "removed subscriber {} from mesh controller",
843 message.0.port_addr()
844 );
845 }
846 Ok(())
847 }
848}
849
850#[async_trait]
851impl<T: Controlled> Handler<GetSubscriberCount> for ResourceController<T>
852where
853 resource::State<T::StateInner>: RemoteMessage,
854{
855 async fn handle(
856 &mut self,
857 cx: &Context<Self>,
858 message: GetSubscriberCount,
859 ) -> anyhow::Result<()> {
860 message.0.post(cx, self.health_state.subscribers.len());
861 Ok(())
862 }
863}
864
865#[async_trait]
866impl<T: Controlled> Handler<resource::CreateOrUpdate<resource::mesh::Spec<()>>>
867 for ResourceController<T>
868where
869 resource::State<T::StateInner>: RemoteMessage,
870{
871 async fn handle(
874 &mut self,
875 _cx: &Context<Self>,
876 _message: resource::CreateOrUpdate<resource::mesh::Spec<()>>,
877 ) -> anyhow::Result<()> {
878 Ok(())
879 }
880}
881
882#[async_trait]
883impl<T: Controlled> Handler<resource::GetState<resource::mesh::State<()>>> for ResourceController<T>
884where
885 resource::State<T::StateInner>: RemoteMessage,
886{
887 async fn handle(
888 &mut self,
889 cx: &Context<Self>,
890 message: resource::GetState<resource::mesh::State<()>>,
891 ) -> anyhow::Result<()> {
892 self.handle_get_state_msg(cx, message)
893 }
894}
895
896#[async_trait]
897impl<T: Controlled> Handler<resource::Stop> for ResourceController<T>
898where
899 resource::State<T::StateInner>: RemoteMessage,
900{
901 async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
902 let mesh_name = self.mesh.id().clone();
903 tracing::info!(
904 name = "ResourceControllerStatus",
905 %mesh_name,
906 reason = %message.reason,
907 "stopping mesh"
908 );
909 if self.monitor.take().is_none() {
910 tracing::debug!(
911 actor_id = %cx.self_addr(),
912 %mesh_name,
913 "duplicate stop request, mesh is already stopped",
914 );
915 return Ok(());
916 }
917 let display = self.supervision_display_name();
918 self.mesh
919 .handle_stop_request(cx, &display, message.reason, &mut self.health_state)
920 .await
921 }
922}
923
924#[async_trait]
925impl<T: Controlled> Handler<resource::WaitRankStatus> for ResourceController<T>
926where
927 resource::State<T::StateInner>: RemoteMessage,
928{
929 async fn handle(
933 &mut self,
934 cx: &Context<Self>,
935 msg: resource::WaitRankStatus,
936 ) -> anyhow::Result<()> {
937 self.mesh.forward_wait_rank_status(cx, msg)
938 }
939}
940
941#[async_trait]
942impl<T: Controlled> Handler<CheckState> for ResourceController<T>
943where
944 resource::State<T::StateInner>: RemoteMessage,
945{
946 async fn handle(
947 &mut self,
948 cx: &Context<Self>,
949 CheckState(expected_time): CheckState,
950 ) -> Result<(), anyhow::Error> {
951 self.handle_check_state(cx, expected_time).await
952 }
953}
954
955#[async_trait]
956impl<T: Controlled> Handler<resource::State<T::StateInner>> for ResourceController<T>
957where
958 resource::State<T::StateInner>: RemoteMessage,
959{
960 async fn handle(
961 &mut self,
962 cx: &Context<Self>,
963 state: resource::State<T::StateInner>,
964 ) -> anyhow::Result<()> {
965 self.mesh.process_state(cx, state, &mut self.health_state);
966 self.stop_if_all_terminating();
967 Ok(())
968 }
969}
970
971#[async_trait]
973impl<A: Referable> Controlled for ActorMeshRef<A> {
974 type StateInner = ActorState;
975
976 fn stall_counter() -> &'static Counter<u64> {
977 &ACTOR_MESH_CONTROLLER_SUPERVISION_STALLS
978 }
979
980 fn id(&self) -> &ResourceId {
981 ActorMeshRef::id(self).resource_id()
982 }
983
984 fn region(&self) -> &ndslice::Region {
985 ndslice::view::Ranked::region(self)
986 }
987
988 fn subscribe_to_stream(
989 &self,
990 cx: &impl context::Actor,
991 subscriber: hyperactor::PortRef<resource::State<ActorState>>,
992 ) -> anyhow::Result<()> {
993 self.proc_mesh().agent_mesh().cast(
994 cx,
995 resource::StreamState::<ActorState> {
996 id: ActorMeshRef::id(self).resource_id().clone(),
997 subscriber,
998 },
999 )?;
1000 Ok(())
1001 }
1002
1003 fn forward_wait_rank_status(
1004 &self,
1005 cx: &impl context::Actor,
1006 msg: resource::WaitRankStatus,
1007 ) -> anyhow::Result<()> {
1008 self.proc_mesh().agent_mesh().cast(cx, msg)?;
1009 Ok(())
1010 }
1011
1012 async fn poll_states(
1013 &self,
1014 cx: &impl context::Actor,
1015 supervision_display_name: &str,
1016 health_state: &mut HealthState,
1017 ) -> PollResult {
1018 let mesh_name = Controlled::id(self);
1019
1020 let proc_states = self.proc_mesh().proc_states(cx, None).await;
1023 if let Err(e) = proc_states {
1024 send_state_change(
1025 cx,
1026 0,
1027 ActorSupervisionEvent::new(
1028 cx.instance().self_addr().clone(),
1029 None,
1030 ActorStatus::generic_failure(format!(
1031 "unable to query for proc states: {:?}",
1032 e
1033 )),
1034 None,
1035 ),
1036 mesh_name,
1037 false,
1038 health_state,
1039 );
1040 return PollResult::Reschedule;
1041 }
1042 if let Some(proc_states) = proc_states.unwrap() {
1043 if let Some((point, state)) = proc_states
1045 .iter()
1046 .find(|(_rank, state)| state.status.is_terminating())
1047 {
1048 let actor_status =
1052 proc_status_to_actor_status(state.state.and_then(|s| s.proc_status));
1053 let display = crate::actor_display_name(supervision_display_name, &point);
1054 send_state_change(
1055 cx,
1056 point.rank(),
1057 ActorSupervisionEvent::new(
1058 self.get(point.rank()).unwrap().actor_addr().clone(),
1061 Some(display),
1062 actor_status,
1063 None,
1064 ),
1065 mesh_name,
1066 true,
1067 health_state,
1068 );
1069 return PollResult::Reschedule;
1070 }
1071 }
1072
1073 let actor_states = self
1075 .actor_states_with_keepalive(cx, compute_keepalive())
1076 .await;
1077 match actor_states {
1078 Err(e) => {
1079 send_state_change(
1080 cx,
1081 0,
1082 ActorSupervisionEvent::new(
1083 cx.instance().self_addr().clone(),
1084 Some(supervision_display_name.to_string()),
1085 ActorStatus::generic_failure(format!(
1086 "unable to query for actor states: {:?}",
1087 e
1088 )),
1089 None,
1090 ),
1091 mesh_name,
1092 false,
1093 health_state,
1094 );
1095 PollResult::Reschedule
1096 }
1097 Ok(states) => {
1098 let did_notify =
1099 health_state.apply_updates_and_notify(&states, |state, health_state| {
1100 let (rank, events) = actor_state_to_supervision_events(state);
1101 if events.is_empty() {
1102 return false;
1103 }
1104 send_state_change(
1105 cx,
1106 rank,
1107 events[0].clone(),
1108 mesh_name,
1109 false,
1110 health_state,
1111 );
1112 true
1113 });
1114 PollResult::Processed { did_notify }
1115 }
1116 }
1117 }
1118
1119 fn process_state(
1120 &self,
1121 cx: &impl context::Actor,
1122 state: resource::State<ActorState>,
1123 health_state: &mut HealthState,
1124 ) -> bool {
1125 let (rank, events) = actor_state_to_supervision_events(state.clone());
1126 let Ok(point) = Controlled::region(self).extent().point_of_rank(rank) else {
1127 return false;
1128 };
1129
1130 let changed = health_state.maybe_update(point, state.status, state.generation);
1131
1132 if changed && !events.is_empty() {
1133 send_state_change(
1134 cx,
1135 rank,
1136 events[0].clone(),
1137 Controlled::id(self),
1138 false,
1139 health_state,
1140 );
1141 true
1142 } else {
1143 false
1144 }
1145 }
1146
1147 async fn handle_stop_request(
1148 &self,
1149 cx: &impl context::Actor,
1150 _supervision_display_name: &str,
1151 reason: String,
1152 health_state: &mut HealthState,
1153 ) -> anyhow::Result<()> {
1154 let mesh_name = Controlled::id(self);
1155 tracing::info!(
1156 actor_id = %cx.instance().self_addr(),
1157 actor_mesh = %mesh_name,
1158 "forwarding stop request from ActorMeshController to proc mesh"
1159 );
1160
1161 let rank = 0usize;
1168 let event = ActorSupervisionEvent::new(
1169 self.get(rank)
1170 .expect("mesh must have at least one rank")
1171 .actor_addr()
1172 .clone(),
1173 None,
1174 ActorStatus::Stopped("ActorMeshController received explicit stop request".to_string()),
1175 None,
1176 );
1177 let failure_message = MeshFailure {
1178 actor_mesh_name: Some(mesh_name.to_string()),
1179 event,
1180 crashed_ranks: vec![],
1181 };
1182 health_state.unhealthy_event = Some(Unhealthy::StreamClosed(failure_message.clone()));
1183 for subscriber in health_state.subscribers.iter() {
1187 send_subscriber_message(cx, subscriber, failure_message.clone());
1188 }
1189
1190 let max_rank = health_state.statuses.keys().map(|p| p.rank()).max();
1193 let extent = health_state
1194 .statuses
1195 .keys()
1196 .next()
1197 .map(|p| p.extent().clone());
1198
1199 let result = self
1201 .proc_mesh()
1202 .stop_actor_by_id(cx, ActorMeshRef::id(self).clone(), reason)
1203 .await;
1204
1205 match result {
1206 Ok(statuses) => {
1207 for (rank, status) in statuses.iter() {
1209 health_state
1210 .statuses
1211 .entry(rank)
1212 .and_modify(move |s| *s = (status, u64::MAX));
1213 }
1214 }
1215 Err(crate::Error::ActorStopError { statuses }) => {
1216 if let Some(max_rank) = max_rank {
1217 let extent = extent.expect("no actors in mesh");
1218 for (rank, status) in statuses.materialized_iter(max_rank).enumerate() {
1219 *health_state
1220 .statuses
1221 .get_mut(&extent.point_of_rank(rank).expect("illegal rank"))
1222 .unwrap() = (status.clone(), u64::MAX);
1223 }
1224 }
1225 }
1226 Err(e) => {
1227 return Err(e.into());
1228 }
1229 }
1230
1231 tracing::info!(
1232 actor_id = %cx.instance().self_addr(),
1233 actor_mesh = %mesh_name,
1234 "stopped mesh"
1235 );
1236 Ok(())
1237 }
1238
1239 async fn cleanup_stop(&self, cx: &impl context::Actor, reason: String) -> anyhow::Result<()> {
1240 self.proc_mesh()
1241 .stop_actor_by_id(cx, ActorMeshRef::id(self).clone(), reason)
1242 .await?;
1243 Ok(())
1244 }
1245}
1246
1247pub(crate) type ProcMeshController = ResourceController<ProcMeshRef>;
1249
1250#[async_trait]
1252impl Controlled for ProcMeshRef {
1253 type StateInner = crate::host_mesh::host_agent::ProcState;
1254
1255 fn stall_counter() -> &'static Counter<u64> {
1256 &PROC_MESH_CONTROLLER_SUPERVISION_STALLS
1257 }
1258
1259 fn id(&self) -> &ResourceId {
1260 ProcMeshRef::id(self).resource_id()
1261 }
1262
1263 fn region(&self) -> &ndslice::Region {
1264 ndslice::view::Ranked::region(self)
1265 }
1266
1267 fn subscribe_to_stream(
1268 &self,
1269 cx: &impl context::Actor,
1270 subscriber: hyperactor::PortRef<resource::State<Self::StateInner>>,
1271 ) -> anyhow::Result<()> {
1272 for proc_id in self.proc_ids() {
1274 let proc_resource_id = ResourceId::new(proc_id.uid().clone(), proc_id.label().cloned());
1275 let host = crate::host_mesh::HostRef(proc_id.addr().clone());
1276 host.mesh_agent().post(
1277 cx,
1278 resource::StreamState::<Self::StateInner> {
1279 id: proc_resource_id,
1280 subscriber: subscriber.clone(),
1281 },
1282 );
1283 }
1284 Ok(())
1285 }
1286
1287 fn forward_wait_rank_status(
1288 &self,
1289 cx: &impl context::Actor,
1290 msg: resource::WaitRankStatus,
1291 ) -> anyhow::Result<()> {
1292 for proc_id in self.proc_ids() {
1293 let host = crate::host_mesh::HostRef(proc_id.addr().clone());
1294 host.mesh_agent().post(cx, msg.clone());
1295 }
1296 Ok(())
1297 }
1298
1299 async fn poll_states(
1300 &self,
1301 cx: &impl context::Actor,
1302 supervision_display_name: &str,
1303 health_state: &mut HealthState,
1304 ) -> PollResult {
1305 let mesh_name = Controlled::id(self);
1306
1307 let proc_states = self.proc_states(cx, compute_keepalive()).await;
1308 match proc_states {
1309 Err(e) => {
1310 send_state_change(
1311 cx,
1312 0,
1313 ActorSupervisionEvent::new(
1314 cx.instance().self_addr().clone(),
1315 Some(supervision_display_name.to_string()),
1316 ActorStatus::generic_failure(format!(
1317 "unable to query for proc states: {:?}",
1318 e
1319 )),
1320 None,
1321 ),
1322 mesh_name,
1323 false,
1324 health_state,
1325 );
1326 PollResult::Reschedule
1327 }
1328 Ok(None) => PollResult::Processed { did_notify: false },
1329 Ok(Some(states)) => {
1330 let did_notify =
1331 health_state.apply_updates_and_notify(&states, |state, health_state| {
1332 self.notify_proc_state_change(
1333 cx,
1334 supervision_display_name,
1335 state,
1336 health_state,
1337 )
1338 });
1339 PollResult::Processed { did_notify }
1340 }
1341 }
1342 }
1343
1344 fn process_state(
1345 &self,
1346 cx: &impl context::Actor,
1347 state: resource::State<Self::StateInner>,
1348 health_state: &mut HealthState,
1349 ) -> bool {
1350 let Ok(point) = Controlled::region(self).extent().point_of_rank(
1351 state
1352 .state
1353 .as_ref()
1354 .map(|s| s.create_rank)
1355 .unwrap_or(usize::MAX),
1356 ) else {
1357 return false;
1358 };
1359 let changed = health_state.maybe_update(point, state.status.clone(), state.generation);
1360 if !changed {
1361 return false;
1362 }
1363 let display = Controlled::id(self).to_string();
1364 self.notify_proc_state_change(cx, &display, state, health_state)
1365 }
1366
1367 async fn handle_stop_request(
1368 &self,
1369 cx: &impl context::Actor,
1370 _supervision_display_name: &str,
1371 reason: String,
1372 health_state: &mut HealthState,
1373 ) -> anyhow::Result<()> {
1374 let mesh_name = Controlled::id(self);
1375 tracing::info!(
1376 actor_id = %cx.instance().self_addr(),
1377 proc_mesh = %mesh_name,
1378 "ProcMeshController stopping proc mesh"
1379 );
1380 let event = ActorSupervisionEvent::new(
1382 cx.instance().self_addr().clone(),
1383 None,
1384 ActorStatus::Stopped("ProcMeshController received explicit stop request".to_string()),
1385 None,
1386 );
1387 let failure_message = MeshFailure {
1388 actor_mesh_name: Some(mesh_name.to_string()),
1389 event,
1390 crashed_ranks: vec![],
1391 };
1392 health_state.unhealthy_event = Some(Unhealthy::StreamClosed(failure_message.clone()));
1393 for subscriber in health_state.subscribers.iter() {
1394 send_subscriber_message(cx, subscriber, failure_message.clone());
1395 }
1396
1397 let names = self.proc_ids().collect::<Vec<hyperactor::ProcAddr>>();
1398 let region = Ranked::region(self).clone();
1399 let Some(hosts) = self.hosts() else {
1400 return Ok(());
1401 };
1402 let max_rank = health_state.statuses.keys().map(|p| p.rank()).max();
1408 let extent = health_state
1409 .statuses
1410 .keys()
1411 .next()
1412 .map(|p| p.extent().clone());
1413 match hosts
1414 .stop_proc_mesh(cx, self.id(), names, region, reason)
1415 .await
1416 {
1417 Ok(statuses) => {
1418 for (rank, status) in statuses.iter() {
1419 health_state
1420 .statuses
1421 .entry(rank)
1422 .and_modify(move |s| *s = (status, u64::MAX));
1423 }
1424 Ok(())
1425 }
1426 Err(crate::Error::ProcMeshStopError { statuses }) => {
1427 if let (Some(max_rank), Some(extent)) = (max_rank, extent) {
1428 for (rank, status) in statuses.materialized_iter(max_rank).enumerate() {
1429 if let Ok(point) = extent.point_of_rank(rank) {
1430 health_state
1431 .statuses
1432 .entry(point)
1433 .and_modify(|s| *s = (status.clone(), u64::MAX));
1434 }
1435 }
1436 }
1437 Err(crate::Error::ProcMeshStopError { statuses }.into())
1438 }
1439 Err(e) => Err(e.into()),
1440 }
1441 }
1442
1443 async fn cleanup_stop(&self, cx: &impl context::Actor, reason: String) -> anyhow::Result<()> {
1444 let names = self.proc_ids().collect::<Vec<hyperactor::ProcAddr>>();
1445 let region = Ranked::region(self).clone();
1446 if let Some(hosts) = self.hosts() {
1447 hosts
1448 .stop_proc_mesh(cx, self.id(), names, region, reason)
1449 .await?;
1450 }
1451 Ok(())
1452 }
1453}
1454
1455impl ProcMeshRef {
1456 fn notify_proc_state_change(
1460 &self,
1461 cx: &impl context::Actor,
1462 supervision_display_name: &str,
1463 state: resource::State<crate::host_mesh::host_agent::ProcState>,
1464 health_state: &mut HealthState,
1465 ) -> bool {
1466 let create_rank = state.state.as_ref().map(|s| s.create_rank);
1467 let actor_status = proc_status_to_actor_status(state.state.and_then(|s| s.proc_status));
1468 let event = ActorSupervisionEvent::new(
1469 cx.instance().self_addr().clone(),
1470 Some(supervision_display_name.to_string()),
1471 actor_status,
1472 None,
1473 );
1474 let rank = create_rank
1475 .and_then(|r| {
1476 ndslice::view::Ranked::region(self)
1477 .extent()
1478 .point_of_rank(r)
1479 .ok()
1480 })
1481 .map(|p| p.rank())
1482 .unwrap_or(0);
1483 send_state_change(cx, rank, event, Controlled::id(self), true, health_state);
1484 true
1485 }
1486}
1487
1488#[cfg(test)]
1489mod tests {
1490 use std::ops::Deref;
1491 use std::time::Duration;
1492
1493 use hyperactor::actor::ActorStatus;
1494 use hyperactor::id::Label;
1495 use ndslice::Extent;
1496 use ndslice::ViewExt;
1497
1498 #[cfg(fbcode_build)]
1499 use super::SUPERVISION_POLL_FREQUENCY;
1500 use super::proc_status_to_actor_status;
1501 use crate::ActorMesh;
1502 use crate::bootstrap::ProcStatus;
1503 #[cfg(fbcode_build)]
1504 use crate::host_mesh::PROC_SPAWN_MAX_IDLE;
1505 use crate::mesh_id::ActorMeshId;
1506 #[cfg(fbcode_build)]
1507 use crate::mesh_id::HostMeshId;
1508 use crate::proc_agent::MESH_ORPHAN_TIMEOUT;
1509 use crate::resource;
1510 #[cfg(fbcode_build)]
1511 use crate::supervision::MeshFailure;
1512 use crate::test_utils::local_host_mesh;
1513 use crate::testactor;
1514 use crate::testing;
1515
1516 #[cfg(fbcode_build)]
1521 struct TestHostMesh {
1522 guard: crate::host_mesh::HostMeshShutdownGuard,
1523 children: Vec<tokio::process::Child>,
1524 }
1525
1526 #[cfg(fbcode_build)]
1527 impl TestHostMesh {
1528 async fn kill_hosts(&mut self) {
1529 for child in &mut self.children {
1530 let _ = child.start_kill();
1531 let _ = child.wait().await;
1532 }
1533 self.children.clear();
1534 }
1535 }
1536
1537 #[cfg(fbcode_build)]
1538 impl std::ops::Deref for TestHostMesh {
1539 type Target = crate::host_mesh::HostMeshShutdownGuard;
1540
1541 fn deref(&self) -> &Self::Target {
1542 &self.guard
1543 }
1544 }
1545
1546 #[cfg(fbcode_build)]
1547 impl std::ops::DerefMut for TestHostMesh {
1548 fn deref_mut(&mut self) -> &mut Self::Target {
1549 &mut self.guard
1550 }
1551 }
1552
1553 #[tokio::test]
1561 async fn test_orphaned_actors_are_cleaned_up() {
1562 let config = hyperactor_config::global::lock();
1563 let _orphan = config.override_key(MESH_ORPHAN_TIMEOUT, Some(Duration::from_secs(1)));
1565
1566 let instance = testing::instance();
1567 let host_mesh = local_host_mesh(2).await;
1568 let proc_mesh = host_mesh
1569 .spawn(instance, "test", Extent::unity(), None, None)
1570 .await
1571 .unwrap();
1572
1573 let actor_name = ActorMeshId::instance(Label::new("orphan_test").unwrap());
1574 let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
1578 .spawn_with_name(instance, actor_name.clone(), &(), None, true)
1579 .await
1580 .unwrap();
1581 assert!(
1582 actor_mesh.deref().extent().num_ranks() > 0,
1583 "should have spawned at least one actor"
1584 );
1585
1586 let states = proc_mesh
1589 .actor_states_with_keepalive(
1590 instance,
1591 actor_name.clone(),
1592 Some(std::time::SystemTime::now() + Duration::from_secs(2)),
1593 )
1594 .await
1595 .unwrap();
1596 for state in states.values() {
1598 assert_eq!(
1599 state.status,
1600 resource::Status::Running,
1601 "actor should be running before expiry"
1602 );
1603 }
1604
1605 let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
1610 loop {
1611 let states = proc_mesh
1612 .actor_states(instance, actor_name.clone())
1613 .await
1614 .unwrap();
1615 if states
1616 .values()
1617 .all(|s| s.status == resource::Status::Stopped)
1618 {
1619 break;
1620 }
1621 assert!(
1622 tokio::time::Instant::now() < deadline,
1623 "timed out waiting for actors to be stopped after keepalive expiry"
1624 );
1625 tokio::time::sleep(Duration::from_millis(200)).await;
1626 }
1627 }
1628
1629 #[cfg(fbcode_build)]
1632 async fn host_mesh_with_config(n: usize) -> TestHostMesh {
1633 use hyperactor::channel::ChannelTransport;
1634 use tokio::process::Command;
1635
1636 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1637 let mut host_addrs = vec![];
1638 let mut children = Vec::new();
1639 for _ in 0..n {
1640 host_addrs.push(ChannelTransport::Unix.any());
1641 }
1642
1643 for host in host_addrs.iter() {
1644 let mut cmd = Command::new(program.clone());
1645 let boot = crate::Bootstrap::Host {
1646 addr: host.clone(),
1647 command: None,
1648 config: Some(hyperactor_config::global::attrs()),
1649 exit_on_shutdown: false,
1650 };
1651 boot.to_env(&mut cmd);
1652 cmd.kill_on_drop(false);
1653 unsafe {
1656 cmd.pre_exec(crate::bootstrap::install_pdeathsig_kill);
1657 }
1658 children.push(cmd.spawn().unwrap());
1659 }
1660
1661 let host_mesh = crate::HostMeshRef::from_hosts(
1662 HostMeshId::instance(Label::new("test").unwrap()),
1663 host_addrs,
1664 );
1665 TestHostMesh {
1666 guard: crate::host_mesh::HostMesh::take(host_mesh).shutdown_guard(),
1667 children,
1668 }
1669 }
1670
1671 #[tokio::test]
1678 #[cfg(fbcode_build)]
1679 async fn test_orphaned_actors_cleaned_up_on_controller_crash() {
1680 let config = hyperactor_config::global::lock();
1681 let _orphan = config.override_key(MESH_ORPHAN_TIMEOUT, Some(Duration::from_secs(2)));
1682 let _poll = config.override_key(SUPERVISION_POLL_FREQUENCY, Duration::from_secs(1));
1683 let _proc_spawn = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(60));
1684 let _host_spawn = config.override_key(
1685 hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1686 Duration::from_secs(60),
1687 );
1688
1689 let instance = testing::instance();
1690 let num_replicas = 1;
1691
1692 let mut actor_hm = host_mesh_with_config(num_replicas).await;
1697 let actor_proc_mesh = actor_hm
1698 .spawn(instance, "actors", Extent::unity(), None, None)
1699 .await
1700 .unwrap();
1701
1702 let mut controller_hm = host_mesh_with_config(1).await;
1704 let controller_proc_mesh = controller_hm
1705 .spawn(instance, "controller", Extent::unity(), None, None)
1706 .await
1707 .unwrap();
1708
1709 let child_name = ActorMeshId::instance(Label::new("orphan_child").unwrap());
1710
1711 let (supervision_port, _supervision_receiver) = instance.open_port::<MeshFailure>();
1713 let supervisor = supervision_port.bind();
1714
1715 let _wrapper_mesh: ActorMesh<testactor::WrapperActor> = controller_proc_mesh
1719 .spawn(
1720 instance,
1721 "wrapper",
1722 &(
1723 actor_proc_mesh.deref().clone(),
1724 supervisor,
1725 child_name.clone(),
1726 ),
1727 )
1728 .await
1729 .unwrap();
1730
1731 tokio::time::sleep(Duration::from_secs(3)).await;
1740 let states = actor_proc_mesh
1741 .actor_states(instance, child_name.clone())
1742 .await
1743 .unwrap();
1744 for state in states.values() {
1745 assert_eq!(
1746 state.status,
1747 resource::Status::Running,
1748 "actor should be running before controller crash"
1749 );
1750 }
1751
1752 controller_hm.kill_hosts().await;
1759
1760 let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
1764 loop {
1765 let states = actor_proc_mesh
1766 .actor_states(instance, child_name.clone())
1767 .await
1768 .unwrap();
1769 if states
1770 .values()
1771 .all(|s| s.status == resource::Status::Stopped)
1772 {
1773 break;
1774 }
1775 assert!(
1776 tokio::time::Instant::now() < deadline,
1777 "timed out waiting for actors to be stopped after controller crash and orphan timeout"
1778 );
1779 tokio::time::sleep(Duration::from_millis(200)).await;
1780 }
1781
1782 let _ = actor_hm.shutdown(instance).await;
1783 }
1784
1785 #[test]
1786 fn test_proc_status_to_actor_status_stopped_cleanly() {
1787 let status = proc_status_to_actor_status(Some(ProcStatus::Stopped {
1788 exit_code: 0,
1789 stderr_tail: vec![],
1790 }));
1791 assert!(
1792 matches!(status, ActorStatus::Stopped(ref msg) if msg.contains("cleanly")),
1793 "expected Stopped, got {:?}",
1794 status
1795 );
1796 }
1797
1798 #[test]
1799 fn test_proc_status_to_actor_status_nonzero_exit() {
1800 let status = proc_status_to_actor_status(Some(ProcStatus::Stopped {
1801 exit_code: 1,
1802 stderr_tail: vec![],
1803 }));
1804 assert!(
1805 matches!(status, ActorStatus::Failed(_)),
1806 "expected Failed, got {:?}",
1807 status
1808 );
1809 }
1810
1811 #[test]
1812 fn test_proc_status_to_actor_status_stopping_is_not_a_failure() {
1813 let status = proc_status_to_actor_status(Some(ProcStatus::Stopping {
1814 started_at: std::time::SystemTime::now(),
1815 }));
1816 assert!(
1817 matches!(status, ActorStatus::Stopped(ref msg) if msg.contains("stopping")),
1818 "expected Stopped, got {:?}",
1819 status
1820 );
1821 }
1822
1823 #[test]
1824 fn test_proc_status_to_actor_status_none() {
1825 let status = proc_status_to_actor_status(None);
1826 assert!(
1827 matches!(status, ActorStatus::Stopped(_)),
1828 "expected Stopped, got {:?}",
1829 status
1830 );
1831 }
1832
1833 #[test]
1834 fn test_proc_status_to_actor_status_killed() {
1835 let status = proc_status_to_actor_status(Some(ProcStatus::Killed {
1836 signal: 9,
1837 core_dumped: false,
1838 }));
1839 assert!(
1840 matches!(status, ActorStatus::Failed(_)),
1841 "expected Failed, got {:?}",
1842 status
1843 );
1844 }
1845
1846 #[test]
1847 fn test_proc_status_to_actor_status_failed() {
1848 let status = proc_status_to_actor_status(Some(ProcStatus::Failed {
1849 reason: "oom".to_string(),
1850 }));
1851 assert!(
1852 matches!(status, ActorStatus::Failed(_)),
1853 "expected Failed, got {:?}",
1854 status
1855 );
1856 }
1857}