1use std::collections::HashMap;
10use std::collections::HashSet;
11use std::fmt::Debug;
12
13use async_trait::async_trait;
14use hyperactor::Actor;
15use hyperactor::Bind;
16use hyperactor::Context;
17use hyperactor::Handler;
18use hyperactor::Instance;
19use hyperactor::Unbind;
20use hyperactor::actor::ActorError;
21use hyperactor::actor::ActorErrorKind;
22use hyperactor::actor::ActorStatus;
23use hyperactor::actor::Referable;
24use hyperactor::actor::handle_undeliverable_message;
25use hyperactor::context;
26use hyperactor::kv_pairs;
27use hyperactor::mailbox::MessageEnvelope;
28use hyperactor::mailbox::Undeliverable;
29use hyperactor::reference as hyperactor_reference;
30use hyperactor::supervision::ActorSupervisionEvent;
31use hyperactor_config::CONFIG;
32use hyperactor_config::ConfigAttr;
33use hyperactor_config::Flattrs;
34use hyperactor_config::attrs::declare_attrs;
35use hyperactor_telemetry::declare_static_counter;
36use ndslice::ViewExt;
37use ndslice::view::CollectMeshExt;
38use ndslice::view::Point;
39use ndslice::view::Ranked;
40use serde::Deserialize;
41use serde::Serialize;
42use tokio::time::Duration;
43use typeuri::Named;
44
45use crate::Name;
46use crate::ValueMesh;
47use crate::actor_mesh::ActorMeshRef;
48use crate::bootstrap::ProcStatus;
49use crate::casting::CAST_ACTOR_MESH_ID;
50use crate::casting::update_undeliverable_envelope_for_casting;
51use crate::host_mesh::HostMeshRef;
52use crate::proc_agent::ActorState;
53use crate::proc_agent::MESH_ORPHAN_TIMEOUT;
54use crate::proc_mesh::ProcMeshRef;
55use crate::resource;
56use crate::supervision::MeshFailure;
57use crate::supervision::Unhealthy;
58
59pub const ACTOR_MESH_CONTROLLER_NAME: &str = "actor_mesh_controller";
61
62declare_attrs! {
63 @meta(CONFIG = ConfigAttr::new(
71 Some("HYPERACTOR_MESH_SUPERVISION_POLL_FREQUENCY".to_string()),
72 None,
73 ))
74 pub attr SUPERVISION_POLL_FREQUENCY: Duration = Duration::from_secs(10);
75}
76
77declare_static_counter!(
78 ACTOR_MESH_CONTROLLER_SUPERVISION_STALLS,
79 "actor.actor_mesh_controller.num_stalls"
80);
81
82#[derive(Debug)]
83struct HealthState {
84 statuses: HashMap<Point, (resource::Status, u64)>,
88 unhealthy_event: Option<Unhealthy>,
89 crashed_ranks: HashMap<usize, ActorSupervisionEvent>,
90 owner: Option<hyperactor_reference::PortRef<MeshFailure>>,
92 subscribers: HashSet<hyperactor_reference::PortRef<Option<MeshFailure>>>,
94}
95
96impl HealthState {
97 fn new(
98 statuses: HashMap<Point, resource::Status>,
99 owner: Option<hyperactor_reference::PortRef<MeshFailure>>,
100 ) -> Self {
101 Self {
102 statuses: statuses
103 .into_iter()
104 .map(|(point, status)| (point, (status, 0)))
105 .collect(),
106 unhealthy_event: None,
107 crashed_ranks: HashMap::new(),
108 owner,
109 subscribers: HashSet::new(),
110 }
111 }
112
113 fn maybe_update(&mut self, point: Point, status: resource::Status, generation: u64) -> bool {
117 use std::collections::hash_map::Entry;
118 match self.statuses.entry(point) {
119 Entry::Occupied(mut entry) => {
120 let (old_status, old_gen) = entry.get();
121 if old_status.is_terminating() || *old_gen > generation {
124 return false;
125 }
126 let changed = *old_status != status;
127 *entry.get_mut() = (status, generation);
128 changed
129 }
130 Entry::Vacant(entry) => {
131 entry.insert((status, generation));
132 true
133 }
134 }
135 }
136}
137
138#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
144pub struct Subscribe(pub hyperactor_reference::PortRef<Option<MeshFailure>>);
145
146#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
149pub struct Unsubscribe(pub hyperactor_reference::PortRef<Option<MeshFailure>>);
150
151#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
153pub struct GetSubscriberCount(#[binding(include)] pub hyperactor_reference::PortRef<usize>);
154
155#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
160pub struct CheckState(pub std::time::SystemTime);
161
162#[hyperactor::export(handlers = [
170 Subscribe,
171 Unsubscribe,
172 GetSubscriberCount,
173 resource::State<ActorState>,
174 resource::CreateOrUpdate<resource::mesh::Spec<()>> { cast = true },
175 resource::GetState<resource::mesh::State<()>> { cast = true },
176 resource::Stop { cast = true },
177])]
178pub struct ActorMeshController<A>
179where
180 A: Referable,
181{
182 mesh: ActorMeshRef<A>,
183 supervision_display_name: String,
184 health_state: HealthState,
186 monitor: Option<()>,
190}
191
192impl<A: Referable> resource::mesh::Mesh for ActorMeshController<A> {
193 type Spec = ();
194 type State = ();
195}
196
197impl<A: Referable> ActorMeshController<A> {
198 pub(crate) fn new(
200 mesh: ActorMeshRef<A>,
201 supervision_display_name: Option<String>,
202 port: Option<hyperactor_reference::PortRef<MeshFailure>>,
203 initial_statuses: ValueMesh<resource::Status>,
204 ) -> Self {
205 let supervision_display_name =
206 supervision_display_name.unwrap_or_else(|| mesh.name().to_string());
207 Self {
208 mesh,
209 supervision_display_name,
210 health_state: HealthState::new(initial_statuses.iter().collect(), port),
211 monitor: None,
212 }
213 }
214
215 async fn stop(
216 &self,
217 cx: &impl context::Actor,
218 reason: String,
219 ) -> crate::Result<ValueMesh<resource::Status>> {
220 self.mesh
222 .proc_mesh()
223 .stop_actor_by_name(cx, self.mesh.name().clone(), reason)
224 .await
225 }
226
227 fn self_check_state_message(&self, cx: &Instance<Self>) -> Result<(), ActorError> {
228 if self.monitor.is_some() {
230 let delay = hyperactor_config::global::get(SUPERVISION_POLL_FREQUENCY);
233 cx.self_message_with_delay(CheckState(std::time::SystemTime::now() + delay), delay)
234 } else {
235 Ok(())
236 }
237 }
238}
239
240declare_attrs! {
241 pub attr ACTOR_MESH_SUBSCRIBER_MESSAGE: bool;
244}
245
246fn send_subscriber_message(
247 cx: &impl context::Actor,
248 subscriber: &hyperactor_reference::PortRef<Option<MeshFailure>>,
249 message: MeshFailure,
250) {
251 let mut headers = Flattrs::new();
252 headers.set(ACTOR_MESH_SUBSCRIBER_MESSAGE, true);
253 if let Err(error) = subscriber.send_with_headers(cx, headers, Some(message.clone())) {
254 tracing::warn!(
255 event = %message,
256 "failed to send supervision event to subscriber {}: {}",
257 subscriber.port_id(),
258 error
259 );
260 } else {
261 tracing::info!(event = %message, "sent supervision failure message to subscriber {}", subscriber.port_id());
262 }
263}
264
265impl<A: Referable> Debug for ActorMeshController<A> {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 f.debug_struct("MeshController")
268 .field("mesh", &self.mesh)
269 .field("health_state", &self.health_state)
270 .field("monitor", &self.monitor)
271 .finish()
272 }
273}
274
275#[async_trait]
276impl<A: Referable> Actor for ActorMeshController<A> {
277 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
278 this.set_system();
279 self.monitor = Some(());
284 self.self_check_state_message(this)?;
285
286 self.mesh.proc_mesh().agent_mesh().cast(
290 this,
291 resource::StreamState::<ActorState> {
292 name: self.mesh.name().clone(),
293 subscriber: hyperactor_reference::PortRef::<resource::State<ActorState>>::attest_message_port(this.self_id()).unsplit(),
313 },
314 )?;
315
316 let owner = if let Some(owner) = &self.health_state.owner {
317 owner.to_string()
318 } else {
319 String::from("None")
320 };
321 tracing::info!(actor_id = %this.self_id(), %owner, "started mesh controller for {}", self.mesh.name());
322 Ok(())
323 }
324
325 async fn cleanup(
326 &mut self,
327 this: &Instance<Self>,
328 _err: Option<&ActorError>,
329 ) -> Result<(), anyhow::Error> {
330 if self.monitor.take().is_some() {
333 tracing::info!(actor_id = %this.self_id(), actor_mesh = %self.mesh.name(), "starting cleanup for ActorMeshController, stopping actor mesh");
334 self.stop(this, "actor mesh controller cleanup".to_string())
335 .await?;
336 }
337 Ok(())
338 }
339
340 async fn handle_undeliverable_message(
341 &mut self,
342 cx: &Instance<Self>,
343 mut envelope: Undeliverable<MessageEnvelope>,
344 ) -> Result<(), anyhow::Error> {
345 envelope = update_undeliverable_envelope_for_casting(envelope);
347 if let Some(true) = envelope.0.headers().get(ACTOR_MESH_SUBSCRIBER_MESSAGE) {
348 let dest_port_id = envelope.0.dest().clone();
353 let port = hyperactor_reference::PortRef::<Option<MeshFailure>>::attest(dest_port_id);
354 let did_exist = self.health_state.subscribers.remove(&port);
355 if did_exist {
356 tracing::debug!(
357 actor_id = %cx.self_id(),
358 num_subscribers = self.health_state.subscribers.len(),
359 "ActorMeshController: handle_undeliverable_message: removed subscriber {} from mesh controller",
360 port.port_id()
361 );
362 }
363 Ok(())
364 } else if envelope.0.headers().get(CAST_ACTOR_MESH_ID).is_some() {
365 tracing::warn!(
370 actor_id = %cx.self_id(),
371 dest = %envelope.0.dest(),
372 "ActorMeshController: ignoring undeliverable cast message",
373 );
374 Ok(())
375 } else {
376 handle_undeliverable_message(cx, envelope)
377 }
378 }
379}
380
381#[async_trait]
382impl<A: Referable> Handler<Subscribe> for ActorMeshController<A> {
383 async fn handle(&mut self, cx: &Context<Self>, message: Subscribe) -> anyhow::Result<()> {
384 if let Some(unhealthy) = &self.health_state.unhealthy_event {
394 let msg = match unhealthy {
395 Unhealthy::StreamClosed(msg) | Unhealthy::Crashed(msg) => msg,
396 };
397 let mut replay_msg = msg.clone();
398 replay_msg.crashed_ranks = self.health_state.crashed_ranks.keys().copied().collect();
399 send_subscriber_message(cx, &message.0, replay_msg);
400 }
401 let port_id = message.0.port_id().clone();
402 if self.health_state.subscribers.insert(message.0) {
403 tracing::debug!(actor_id = %cx.self_id(), num_subscribers = self.health_state.subscribers.len(), "added subscriber {} to mesh controller", port_id);
404 }
405 Ok(())
406 }
407}
408
409#[async_trait]
410impl<A: Referable> Handler<Unsubscribe> for ActorMeshController<A> {
411 async fn handle(&mut self, cx: &Context<Self>, message: Unsubscribe) -> anyhow::Result<()> {
412 if self.health_state.subscribers.remove(&message.0) {
413 tracing::debug!(actor_id = %cx.self_id(), num_subscribers = self.health_state.subscribers.len(), "removed subscriber {} from mesh controller", message.0.port_id());
414 }
415 Ok(())
416 }
417}
418
419#[async_trait]
420impl<A: Referable> Handler<GetSubscriberCount> for ActorMeshController<A> {
421 async fn handle(
422 &mut self,
423 cx: &Context<Self>,
424 message: GetSubscriberCount,
425 ) -> anyhow::Result<()> {
426 message.0.send(cx, self.health_state.subscribers.len())?;
427 Ok(())
428 }
429}
430
431#[async_trait]
432impl<A: Referable> Handler<resource::CreateOrUpdate<resource::mesh::Spec<()>>>
433 for ActorMeshController<A>
434{
435 async fn handle(
438 &mut self,
439 _cx: &Context<Self>,
440 _message: resource::CreateOrUpdate<resource::mesh::Spec<()>>,
441 ) -> anyhow::Result<()> {
442 Ok(())
443 }
444}
445
446#[async_trait]
447impl<A: Referable> Handler<resource::GetState<resource::mesh::State<()>>>
448 for ActorMeshController<A>
449{
450 async fn handle(
451 &mut self,
452 cx: &Context<Self>,
453 message: resource::GetState<resource::mesh::State<()>>,
454 ) -> anyhow::Result<()> {
455 let status = if let Some(Unhealthy::Crashed(e)) = &self.health_state.unhealthy_event {
456 resource::Status::Failed(e.to_string())
457 } else if let Some(Unhealthy::StreamClosed(_)) = &self.health_state.unhealthy_event {
458 resource::Status::Stopped
459 } else {
460 resource::Status::Running
461 };
462 let mut statuses = self
463 .health_state
464 .statuses
465 .iter()
466 .map(|(p, (s, _))| (p.clone(), s.clone()))
467 .collect::<Vec<_>>();
468 statuses.sort_by_key(|(p, _)| p.rank());
469 let statuses: ValueMesh<resource::Status> =
470 statuses
471 .into_iter()
472 .map(|(_, s)| s)
473 .collect_mesh::<ValueMesh<_>>(self.mesh.region().clone())?;
474 let state = resource::mesh::State {
475 statuses,
476 state: (),
477 };
478 message.reply.send(
479 cx,
480 resource::State {
481 name: message.name,
482 status,
483 state: Some(state),
484 generation: 0,
485 timestamp: std::time::SystemTime::now(),
486 },
487 )?;
488 Ok(())
489 }
490}
491
492#[async_trait]
493impl<A: Referable> Handler<resource::Stop> for ActorMeshController<A> {
494 async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
495 let mesh = &self.mesh;
496 let mesh_name = mesh.name();
497 tracing::info!(
498 name = "ActorMeshControllerStatus",
499 %mesh_name,
500 reason = %message.reason,
501 "stopping actor mesh"
502 );
503 if self.monitor.take().is_none() {
508 tracing::debug!(actor_id = %cx.self_id(), actor_mesh = %mesh_name, "duplicate stop request, actor mesh is already stopped");
509 return Ok(());
510 }
511 tracing::info!(actor_id = %cx.self_id(), actor_mesh = %mesh_name, "forwarding stop request from ActorMeshController to proc mesh");
512
513 let rank = 0usize;
520 let event = ActorSupervisionEvent::new(
521 mesh.get(rank).unwrap().actor_id().clone(),
523 None,
524 ActorStatus::Stopped("ActorMeshController received explicit stop request".to_string()),
525 None,
526 );
527 let failure_message = MeshFailure {
528 actor_mesh_name: Some(mesh_name.to_string()),
529 event,
530 crashed_ranks: vec![],
531 };
532 self.health_state.unhealthy_event = Some(Unhealthy::StreamClosed(failure_message.clone()));
533 for subscriber in self.health_state.subscribers.iter() {
537 send_subscriber_message(cx, subscriber, failure_message.clone());
538 }
539
540 let max_rank = self.health_state.statuses.keys().map(|p| p.rank()).max();
543 let extent = self
544 .health_state
545 .statuses
546 .keys()
547 .next()
548 .map(|p| p.extent().clone());
549 match self.stop(cx, message.reason.clone()).await {
551 Ok(statuses) => {
552 for (rank, status) in statuses.iter() {
554 self.health_state
555 .statuses
556 .entry(rank)
557 .and_modify(move |s| *s = (status, u64::MAX));
558 }
559 }
560 Err(crate::Error::ActorStopError { statuses }) => {
562 if let Some(max_rank) = max_rank {
564 let extent = extent.expect("no actors in mesh");
565 for (rank, status) in statuses.materialized_iter(max_rank).enumerate() {
566 *self
567 .health_state
568 .statuses
569 .get_mut(&extent.point_of_rank(rank).expect("illegal rank"))
570 .unwrap() = (status.clone(), u64::MAX);
571 }
572 }
573 }
574 Err(e) => {
576 return Err(e.into());
577 }
578 }
579
580 tracing::info!(actor_id = %cx.self_id(), actor_mesh = %mesh_name, "stopped mesh");
581 Ok(())
582 }
583}
584
585fn send_heartbeat(cx: &impl context::Actor, health_state: &HealthState) {
592 tracing::debug!(
593 num_subscribers = health_state.subscribers.len(),
594 "sending heartbeat to subscribers",
595 );
596
597 for subscriber in health_state.subscribers.iter() {
598 let mut headers = Flattrs::new();
599 headers.set(ACTOR_MESH_SUBSCRIBER_MESSAGE, true);
600 if let Err(e) = subscriber.send_with_headers(cx, headers, None) {
601 tracing::warn!(subscriber = %subscriber.port_id(), "error sending heartbeat message: {:?}", e);
602 }
603 }
604}
605
606fn send_state_change(
611 cx: &impl context::Actor,
612 rank: usize,
613 event: ActorSupervisionEvent,
614 mesh_name: &Name,
615 is_proc_stopped: bool,
616 health_state: &mut HealthState,
617) {
618 let is_failed = event.is_error();
621 if is_failed {
622 tracing::warn!(
623 name = "SupervisionEvent",
624 actor_mesh = %mesh_name,
625 %event,
626 "detected supervision error on monitored mesh: name={mesh_name}",
627 );
628 } else {
629 tracing::debug!(
630 name = "SupervisionEvent",
631 actor_mesh = %mesh_name,
632 %event,
633 "detected non-error supervision event on monitored mesh: name={mesh_name}",
634 );
635 }
636
637 let failure_message = MeshFailure {
638 actor_mesh_name: Some(mesh_name.to_string()),
639 event: event.clone(),
640 crashed_ranks: vec![rank],
641 };
642 health_state.crashed_ranks.insert(rank, event.clone());
643 health_state.unhealthy_event = Some(if is_proc_stopped {
644 Unhealthy::StreamClosed(failure_message.clone())
645 } else {
646 Unhealthy::Crashed(failure_message.clone())
647 });
648 if is_failed {
653 if let Some(owner) = &health_state.owner {
654 if let Err(error) = owner.send(cx, failure_message.clone()) {
655 tracing::warn!(
656 name = "SupervisionEvent",
657 actor_mesh = %mesh_name,
658 %event,
659 %error,
660 "failed to send supervision event to owner {}: {}. dropping event",
661 owner.port_id(),
662 error
663 );
664 } else {
665 tracing::info!(actor_mesh = %mesh_name, %event, "sent supervision failure message to owner {}", owner.port_id());
666 }
667 }
668 }
669 for subscriber in health_state.subscribers.iter() {
672 send_subscriber_message(cx, subscriber, failure_message.clone());
673 }
674}
675
676fn actor_state_to_supervision_events(
677 state: resource::State<ActorState>,
678) -> (usize, Vec<ActorSupervisionEvent>) {
679 let (rank, actor_id, events) = match state.state {
680 Some(inner) => (
681 inner.create_rank,
682 Some(inner.actor_id),
683 inner.supervision_events.clone(),
684 ),
685 None => (0, None, vec![]),
686 };
687 let events = match state.status {
688 resource::Status::NotExist | resource::Status::Stopped | resource::Status::Timeout(_) => {
691 if !events.is_empty() {
693 events
694 } else {
695 vec![ActorSupervisionEvent::new(
696 actor_id.expect("actor_id is None"),
697 None,
698 ActorStatus::Stopped(
699 format!(
700 "actor status is {}; actor may have been killed",
701 state.status
702 )
703 .to_string(),
704 ),
705 None,
706 )]
707 }
708 }
709 resource::Status::Failed(_) => events,
710 _ => vec![],
712 };
713 (rank, events)
714}
715
716fn proc_status_to_actor_status(proc_status: Option<ProcStatus>) -> ActorStatus {
725 match proc_status {
726 Some(ProcStatus::Stopped { exit_code: 0, .. }) => {
727 ActorStatus::Stopped("process exited cleanly".to_string())
728 }
729 Some(ProcStatus::Stopped { exit_code, .. }) => {
730 ActorStatus::Failed(ActorErrorKind::Generic(format!(
731 "the process this actor was running on exited with non-zero code {}",
732 exit_code
733 )))
734 }
735 Some(ProcStatus::Stopping { .. }) => {
738 ActorStatus::Stopped("process is stopping".to_string())
739 }
740 None => ActorStatus::Stopped("no status received from process".to_string()),
742 Some(status) => ActorStatus::Failed(ActorErrorKind::Generic(format!(
743 "the process this actor was running on failed: {}",
744 status
745 ))),
746 }
747}
748
749#[async_trait]
750impl<A: Referable> Handler<resource::State<ActorState>> for ActorMeshController<A> {
751 async fn handle(
752 &mut self,
753 cx: &Context<Self>,
754 state: resource::State<ActorState>,
755 ) -> anyhow::Result<()> {
756 let (rank, events) = actor_state_to_supervision_events(state.clone());
757 let point = self.mesh.region().extent().point_of_rank(rank)?;
758
759 let changed = self
760 .health_state
761 .maybe_update(point, state.status, state.generation);
762
763 if changed && !events.is_empty() {
764 send_state_change(
765 cx,
766 rank,
767 events[0].clone(),
768 self.mesh.name(),
769 false,
770 &mut self.health_state,
771 );
772 }
773
774 if self
777 .health_state
778 .statuses
779 .values()
780 .all(|(s, _)| s.is_terminating())
781 {
782 self.monitor.take();
783 }
784 Ok(())
785 }
786}
787
788fn format_system_time(time: std::time::SystemTime) -> String {
789 let datetime: chrono::DateTime<chrono::Local> = time.into();
790 datetime.format("%Y-%m-%d %H:%M:%S").to_string()
791}
792
793#[async_trait]
794impl<A: Referable> Handler<CheckState> for ActorMeshController<A> {
795 async fn handle(
807 &mut self,
808 cx: &Context<Self>,
809 CheckState(expected_time): CheckState,
810 ) -> Result<(), anyhow::Error> {
811 if self.monitor.is_none() {
814 return Ok(());
815 }
816
817 if std::time::SystemTime::now()
829 > expected_time + hyperactor_config::global::get(SUPERVISION_POLL_FREQUENCY)
830 {
831 let expected_time = format_system_time(expected_time);
833 ACTOR_MESH_CONTROLLER_SUPERVISION_STALLS.add(1, kv_pairs!("actor_id" => cx.self_id().to_string(), "expected_time" => expected_time.clone()));
835 tracing::warn!(
836 actor_id = %cx.self_id(),
837 "Handler<CheckState> is being stalled, expected at {}",
838 expected_time,
839 );
840 }
841 let mesh = &self.mesh;
842 let supervision_display_name = &self.supervision_display_name;
843 let proc_states = mesh.proc_mesh().proc_states(cx).await;
845 if let Err(e) = proc_states {
846 send_state_change(
847 cx,
848 0,
849 ActorSupervisionEvent::new(
850 cx.self_id().clone(),
851 None,
852 ActorStatus::generic_failure(format!(
853 "unable to query for proc states: {:?}",
854 e
855 )),
856 None,
857 ),
858 mesh.name(),
859 false,
860 &mut self.health_state,
861 );
862 self.self_check_state_message(cx)?;
863 return Ok(());
864 }
865 if let Some(proc_states) = proc_states.unwrap() {
866 if let Some((point, state)) = proc_states
868 .iter()
869 .find(|(_rank, state)| state.status.is_terminating())
870 {
871 let actor_status =
875 proc_status_to_actor_status(state.state.and_then(|s| s.proc_status));
876 let display_name = crate::actor_display_name(supervision_display_name, &point);
877 send_state_change(
878 cx,
879 point.rank(),
880 ActorSupervisionEvent::new(
881 mesh.get(point.rank()).unwrap().actor_id().clone(),
884 Some(display_name),
885 actor_status,
886 None,
887 ),
888 mesh.name(),
889 true,
890 &mut self.health_state,
891 );
892 self.self_check_state_message(cx)?;
893 return Ok(());
894 }
895 }
896
897 let orphan_timeout = hyperactor_config::global::get(MESH_ORPHAN_TIMEOUT);
899 let keepalive = if orphan_timeout.is_zero() {
900 None
901 } else {
902 Some(std::time::SystemTime::now() + orphan_timeout)
903 };
904 let events = mesh.actor_states_with_keepalive(cx, keepalive).await;
905 if let Err(e) = events {
906 send_state_change(
907 cx,
908 0,
909 ActorSupervisionEvent::new(
910 cx.self_id().clone(),
911 Some(supervision_display_name.clone()),
912 ActorStatus::generic_failure(format!(
913 "unable to query for actor states: {:?}",
914 e
915 )),
916 None,
917 ),
918 mesh.name(),
919 false,
920 &mut self.health_state,
921 );
922 self.self_check_state_message(cx)?;
923 return Ok(());
924 }
925 let mut did_send_state_change = false;
927 let mut any_terminating = false;
930 for (point, state) in events.unwrap().iter() {
933 let changed = self.health_state.maybe_update(
934 point.clone(),
935 state.status.clone(),
936 state.generation,
937 );
938 if !any_terminating {
939 if let Some((s, _)) = self.health_state.statuses.get(&point) {
940 if s.is_terminating() {
941 any_terminating = true;
942 }
943 }
944 }
945 if !changed {
946 continue;
947 }
948 let (rank, events) = actor_state_to_supervision_events(state.clone());
949 if events.is_empty() {
950 continue;
951 }
952 did_send_state_change = true;
953 send_state_change(
954 cx,
955 rank,
956 events[0].clone(),
957 mesh.name(),
958 false,
959 &mut self.health_state,
960 );
961 }
962 if !did_send_state_change && !any_terminating {
963 send_heartbeat(cx, &self.health_state);
968 }
969
970 let all_terminating = self
973 .health_state
974 .statuses
975 .values()
976 .all(|(s, _)| s.is_terminating());
977 if !all_terminating {
978 self.self_check_state_message(cx)?;
980 } else {
981 self.monitor.take();
984 }
985 return Ok(());
986 }
987}
988
989#[derive(Debug)]
990#[hyperactor::export]
991pub(crate) struct ProcMeshController {
992 mesh: ProcMeshRef,
993}
994
995impl ProcMeshController {
996 pub(crate) fn new(mesh: ProcMeshRef) -> Self {
998 Self { mesh }
999 }
1000}
1001
1002#[async_trait]
1003impl Actor for ProcMeshController {
1004 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1005 this.set_system();
1006 Ok(())
1007 }
1008
1009 async fn cleanup(
1010 &mut self,
1011 this: &Instance<Self>,
1012 _err: Option<&ActorError>,
1013 ) -> Result<(), anyhow::Error> {
1014 let names = self
1016 .mesh
1017 .proc_ids()
1018 .collect::<Vec<hyperactor_reference::ProcId>>();
1019 let region = self.mesh.region().clone();
1020 if let Some(hosts) = self.mesh.hosts() {
1021 hosts
1022 .stop_proc_mesh(
1023 this,
1024 self.mesh.name(),
1025 names,
1026 region,
1027 "proc mesh controller cleanup".to_string(),
1028 )
1029 .await
1030 } else {
1031 Ok(())
1032 }
1033 }
1034}
1035
1036#[derive(Debug)]
1037#[hyperactor::export]
1038pub(crate) struct HostMeshController {
1039 mesh: HostMeshRef,
1040}
1041
1042impl HostMeshController {
1043 pub(crate) fn new(mesh: HostMeshRef) -> Self {
1045 Self { mesh }
1046 }
1047}
1048
1049#[async_trait]
1050impl Actor for HostMeshController {
1051 async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1052 this.set_system();
1053 Ok(())
1054 }
1055
1056 async fn cleanup(
1057 &mut self,
1058 this: &Instance<Self>,
1059 _err: Option<&ActorError>,
1060 ) -> Result<(), anyhow::Error> {
1061 for host in self.mesh.values() {
1063 if let Err(e) = host.shutdown(this).await {
1064 tracing::warn!(host = %host, error = %e, "host shutdown failed");
1065 }
1066 }
1067 Ok(())
1068 }
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073 use std::ops::Deref;
1074 use std::time::Duration;
1075
1076 use hyperactor::actor::ActorStatus;
1077 use ndslice::Extent;
1078 use ndslice::ViewExt;
1079
1080 use super::SUPERVISION_POLL_FREQUENCY;
1081 use super::proc_status_to_actor_status;
1082 use crate::ActorMesh;
1083 use crate::Name;
1084 use crate::bootstrap::ProcStatus;
1085 use crate::proc_agent::MESH_ORPHAN_TIMEOUT;
1086 use crate::resource;
1087 use crate::supervision::MeshFailure;
1088 use crate::test_utils::local_host_mesh;
1089 use crate::testactor;
1090 use crate::testing;
1091
1092 #[tokio::test]
1100 async fn test_orphaned_actors_are_cleaned_up() {
1101 let config = hyperactor_config::global::lock();
1102 let _orphan = config.override_key(MESH_ORPHAN_TIMEOUT, Duration::from_secs(1));
1104
1105 let instance = testing::instance();
1106 let host_mesh = local_host_mesh(2).await;
1107 let proc_mesh = host_mesh
1108 .spawn(instance, "test", Extent::unity(), None)
1109 .await
1110 .unwrap();
1111
1112 let actor_name = Name::new("orphan_test").unwrap();
1113 let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
1117 .spawn_with_name(instance, actor_name.clone(), &(), None, true)
1118 .await
1119 .unwrap();
1120 assert!(
1121 actor_mesh.deref().extent().num_ranks() > 0,
1122 "should have spawned at least one actor"
1123 );
1124
1125 let states = proc_mesh
1128 .actor_states_with_keepalive(
1129 instance,
1130 actor_name.clone(),
1131 Some(std::time::SystemTime::now() + Duration::from_secs(2)),
1132 )
1133 .await
1134 .unwrap();
1135 for state in states.values() {
1137 assert_eq!(
1138 state.status,
1139 resource::Status::Running,
1140 "actor should be running before expiry"
1141 );
1142 }
1143
1144 tokio::time::sleep(Duration::from_secs(5)).await;
1149
1150 let states = proc_mesh
1153 .actor_states(instance, actor_name.clone())
1154 .await
1155 .unwrap();
1156 for state in states.values() {
1157 assert_eq!(
1158 state.status,
1159 resource::Status::Stopped,
1160 "actor should be stopped after keepalive expiry"
1161 );
1162 }
1163 }
1164
1165 #[cfg(fbcode_build)]
1168 async fn host_mesh_with_config(n: usize) -> crate::host_mesh::HostMeshShutdownGuard {
1169 use hyperactor::channel::ChannelTransport;
1170 use tokio::process::Command;
1171
1172 let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1173 let mut host_addrs = vec![];
1174 for _ in 0..n {
1175 host_addrs.push(ChannelTransport::Unix.any());
1176 }
1177
1178 for host in host_addrs.iter() {
1179 let mut cmd = Command::new(program.clone());
1180 let boot = crate::Bootstrap::Host {
1181 addr: host.clone(),
1182 command: None,
1183 config: Some(hyperactor_config::global::attrs()),
1184 exit_on_shutdown: false,
1185 };
1186 boot.to_env(&mut cmd);
1187 cmd.kill_on_drop(false);
1188 unsafe {
1191 cmd.pre_exec(crate::bootstrap::install_pdeathsig_kill);
1192 }
1193 cmd.spawn().unwrap();
1194 }
1195
1196 let host_mesh = crate::HostMeshRef::from_hosts(Name::new("test").unwrap(), host_addrs);
1197 crate::host_mesh::HostMesh::take(host_mesh).shutdown_guard()
1198 }
1199
1200 #[tokio::test]
1207 #[cfg(fbcode_build)]
1208 async fn test_orphaned_actors_cleaned_up_on_controller_crash() {
1209 let config = hyperactor_config::global::lock();
1210 let _orphan = config.override_key(MESH_ORPHAN_TIMEOUT, Duration::from_secs(2));
1211 let _poll = config.override_key(SUPERVISION_POLL_FREQUENCY, Duration::from_secs(1));
1212
1213 let instance = testing::instance();
1214 let num_replicas = 2;
1215
1216 let mut actor_hm = host_mesh_with_config(num_replicas).await;
1221 let actor_proc_mesh = actor_hm
1222 .spawn(instance, "actors", Extent::unity(), None)
1223 .await
1224 .unwrap();
1225
1226 let mut controller_hm = host_mesh_with_config(1).await;
1228 let controller_proc_mesh = controller_hm
1229 .spawn(instance, "controller", Extent::unity(), None)
1230 .await
1231 .unwrap();
1232
1233 let child_name = Name::new("orphan_child").unwrap();
1234
1235 let (supervision_port, _supervision_receiver) = instance.open_port::<MeshFailure>();
1237 let supervisor = supervision_port.bind();
1238
1239 let wrapper_mesh: ActorMesh<testactor::WrapperActor> = controller_proc_mesh
1243 .spawn(
1244 instance,
1245 "wrapper",
1246 &(
1247 actor_proc_mesh.deref().clone(),
1248 supervisor,
1249 child_name.clone(),
1250 ),
1251 )
1252 .await
1253 .unwrap();
1254
1255 tokio::time::sleep(Duration::from_secs(3)).await;
1258
1259 let states = actor_proc_mesh
1261 .actor_states(instance, child_name.clone())
1262 .await
1263 .unwrap();
1264 for state in states.values() {
1265 assert_eq!(
1266 state.status,
1267 resource::Status::Running,
1268 "actor should be running before controller crash"
1269 );
1270 }
1271
1272 wrapper_mesh
1276 .cast(
1277 instance,
1278 testactor::CauseSupervisionEvent {
1279 kind: testactor::SupervisionEventType::ProcessExit(1),
1280 send_to_children: false,
1281 },
1282 )
1283 .unwrap();
1284
1285 tokio::time::sleep(Duration::from_secs(8)).await;
1290
1291 let states = actor_proc_mesh
1293 .actor_states(instance, child_name.clone())
1294 .await
1295 .unwrap();
1296 for state in states.values() {
1297 assert_eq!(
1298 state.status,
1299 resource::Status::Stopped,
1300 "actor should be stopped after controller crash and orphan timeout"
1301 );
1302 }
1303
1304 let _ = actor_hm.shutdown(instance).await;
1305 let _ = controller_hm.shutdown(instance).await;
1306 }
1307
1308 #[test]
1309 fn test_proc_status_to_actor_status_stopped_cleanly() {
1310 let status = proc_status_to_actor_status(Some(ProcStatus::Stopped {
1311 exit_code: 0,
1312 stderr_tail: vec![],
1313 }));
1314 assert!(
1315 matches!(status, ActorStatus::Stopped(ref msg) if msg.contains("cleanly")),
1316 "expected Stopped, got {:?}",
1317 status
1318 );
1319 }
1320
1321 #[test]
1322 fn test_proc_status_to_actor_status_nonzero_exit() {
1323 let status = proc_status_to_actor_status(Some(ProcStatus::Stopped {
1324 exit_code: 1,
1325 stderr_tail: vec![],
1326 }));
1327 assert!(
1328 matches!(status, ActorStatus::Failed(_)),
1329 "expected Failed, got {:?}",
1330 status
1331 );
1332 }
1333
1334 #[test]
1335 fn test_proc_status_to_actor_status_stopping_is_not_a_failure() {
1336 let status = proc_status_to_actor_status(Some(ProcStatus::Stopping {
1337 started_at: std::time::SystemTime::now(),
1338 }));
1339 assert!(
1340 matches!(status, ActorStatus::Stopped(ref msg) if msg.contains("stopping")),
1341 "expected Stopped, got {:?}",
1342 status
1343 );
1344 }
1345
1346 #[test]
1347 fn test_proc_status_to_actor_status_none() {
1348 let status = proc_status_to_actor_status(None);
1349 assert!(
1350 matches!(status, ActorStatus::Stopped(_)),
1351 "expected Stopped, got {:?}",
1352 status
1353 );
1354 }
1355
1356 #[test]
1357 fn test_proc_status_to_actor_status_killed() {
1358 let status = proc_status_to_actor_status(Some(ProcStatus::Killed {
1359 signal: 9,
1360 core_dumped: false,
1361 }));
1362 assert!(
1363 matches!(status, ActorStatus::Failed(_)),
1364 "expected Failed, got {:?}",
1365 status
1366 );
1367 }
1368
1369 #[test]
1370 fn test_proc_status_to_actor_status_failed() {
1371 let status = proc_status_to_actor_status(Some(ProcStatus::Failed {
1372 reason: "oom".to_string(),
1373 }));
1374 assert!(
1375 matches!(status, ActorStatus::Failed(_)),
1376 "expected Failed, got {:?}",
1377 status
1378 );
1379 }
1380}