Skip to main content

hyperactor_mesh/
mesh_controller.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::time::SystemTime;
13
14use async_trait::async_trait;
15use hyperactor::Actor;
16use hyperactor::Bind;
17use hyperactor::Context;
18use hyperactor::Endpoint as _;
19use hyperactor::Handler;
20use hyperactor::Instance;
21use hyperactor::RemoteEndpoint as _;
22use hyperactor::Unbind;
23use hyperactor::actor::ActorError;
24use hyperactor::actor::ActorErrorKind;
25use hyperactor::actor::ActorStatus;
26use hyperactor::actor::Referable;
27use hyperactor::actor::handle_undeliverable_message;
28use hyperactor::context;
29use hyperactor::kv_pairs;
30use hyperactor::mailbox::MessageEnvelope;
31use hyperactor::mailbox::RemoteMessage;
32use hyperactor::mailbox::Undeliverable;
33use hyperactor::supervision::ActorSupervisionEvent;
34use hyperactor_config::CONFIG;
35use hyperactor_config::ConfigAttr;
36use hyperactor_config::Flattrs;
37use hyperactor_config::attrs::declare_attrs;
38use hyperactor_telemetry::declare_static_counter;
39use ndslice::ViewExt;
40use ndslice::view::CollectMeshExt;
41use ndslice::view::Point;
42use ndslice::view::Ranked;
43use opentelemetry::metrics::Counter;
44use serde::Deserialize;
45use serde::Serialize;
46use tokio::time::Duration;
47use typeuri::Named;
48
49use crate::ValueMesh;
50use crate::actor_mesh::ActorMeshRef;
51use crate::bootstrap::ProcStatus;
52use crate::casting::CAST_ACTOR_MESH_ID;
53use crate::casting::update_undeliverable_envelope_for_casting;
54use crate::mesh_id::ResourceId;
55use crate::proc_agent::ActorState;
56use crate::proc_agent::MESH_ORPHAN_TIMEOUT;
57use crate::proc_mesh::ProcMeshRef;
58use crate::resource;
59use crate::supervision::MeshFailure;
60use crate::supervision::Unhealthy;
61
62/// Actor name for `ActorMeshController` when spawned as a named child.
63pub const ACTOR_MESH_CONTROLLER_NAME: &str = "actor_mesh_controller";
64
65declare_attrs! {
66    /// Time between checks of actor states to create supervision events for
67    /// owners. The longer this is, the longer it will take to detect a failure
68    /// and report it to all subscribers; however, shorter intervals will send
69    /// more frequent messages and heartbeats just to see everything is still running.
70    /// The default is chosen to balance these two objectives.
71    /// This also controls how frequently the healthy heartbeat is sent out to
72    /// subscribers if there are no failures encountered.
73    @meta(CONFIG = ConfigAttr::new(
74        Some("HYPERACTOR_MESH_SUPERVISION_POLL_FREQUENCY".to_string()),
75        None,
76    ))
77    pub attr SUPERVISION_POLL_FREQUENCY: Duration = Duration::from_secs(10);
78}
79
80declare_static_counter!(
81    ACTOR_MESH_CONTROLLER_SUPERVISION_STALLS,
82    "actor.actor_mesh_controller.num_stalls"
83);
84
85declare_static_counter!(
86    PROC_MESH_CONTROLLER_SUPERVISION_STALLS,
87    "actor.proc_mesh_controller.num_stalls"
88);
89
90/// Aggregated health and subscriber bookkeeping for a single
91/// `ResourceController`. Tracks the most recently observed status of every
92/// rank in the controlled mesh, the latched unhealthy event (if any), the
93/// owner port (notified on failures), and the set of streaming subscribers
94/// (notified on both stop and failure events). The generation counter on
95/// each status entry provides last-writer-wins ordering between streamed
96/// and polled updates.
97#[derive(Debug)]
98pub struct HealthState {
99    /// The status of each rank in the controlled mesh, paired with the
100    /// generation counter from the most recent update. The generation is
101    /// used for last-writer-wins ordering between streamed and polled updates.
102    statuses: HashMap<Point, (resource::Status, u64)>,
103    /// The latched unhealthy event for the mesh, if any. Once set, this is
104    /// surfaced to new subscribers on subscribe and to `GetState` callers.
105    unhealthy_event: Option<Unhealthy>,
106    /// Per-rank supervision events for ranks that have crashed. Used to build
107    /// region-scoped failure reports.
108    crashed_ranks: HashMap<usize, ActorSupervisionEvent>,
109    /// The single owner of the controlled mesh, notified on failure events
110    /// (but not on clean stops).
111    owner: Option<hyperactor::PortRef<MeshFailure>>,
112    /// Streaming subscribers, notified on both stop and failure events as
113    /// well as periodic heartbeats.
114    subscribers: HashSet<hyperactor::PortRef<Option<MeshFailure>>>,
115}
116
117impl HealthState {
118    fn new(
119        statuses: HashMap<Point, resource::Status>,
120        owner: Option<hyperactor::PortRef<MeshFailure>>,
121    ) -> Self {
122        Self {
123            statuses: statuses
124                .into_iter()
125                .map(|(point, status)| (point, (status, 0)))
126                .collect(),
127            unhealthy_event: None,
128            crashed_ranks: HashMap::new(),
129            owner,
130            subscribers: HashSet::new(),
131        }
132    }
133
134    /// Try to update the status at `point`. Returns `true` if the status
135    /// was newly inserted or changed; `false` if dominated by a higher
136    /// generation or unchanged.
137    fn maybe_update(&mut self, point: Point, status: resource::Status, generation: u64) -> bool {
138        use std::collections::hash_map::Entry;
139        match self.statuses.entry(point) {
140            Entry::Occupied(mut entry) => {
141                let (old_status, old_gen) = entry.get();
142                // Once a resource enters a terminating state (including Stopping),
143                // its status is frozen — later updates are ignored.
144                if old_status.is_terminating() || *old_gen > generation {
145                    return false;
146                }
147                let changed = *old_status != status;
148                *entry.get_mut() = (status, generation);
149                changed
150            }
151            Entry::Vacant(entry) => {
152                entry.insert((status, generation));
153                true
154            }
155        }
156    }
157
158    /// True when every tracked rank has reached a terminating status.
159    fn all_terminating(&self) -> bool {
160        self.statuses.values().all(|(s, _)| s.is_terminating())
161    }
162
163    /// True when at least one tracked rank has reached a terminating status.
164    fn any_terminating(&self) -> bool {
165        self.statuses.values().any(|(s, _)| s.is_terminating())
166    }
167
168    /// Apply status updates from polled resource states and invoke `on_change`
169    /// for each rank whose status actually changed. The point passed to
170    /// `on_change` is the created rank, *not* the rank of the possibly sliced
171    /// input mesh. Returns `true` if `on_change` reported at least one
172    /// notification (used to decide whether a heartbeat is needed).
173    pub(crate) fn apply_updates_and_notify<S: Clone + 'static>(
174        &mut self,
175        states: &ValueMesh<resource::State<S>>,
176        mut on_change: impl FnMut(resource::State<S>, &mut HealthState) -> bool,
177    ) -> bool {
178        let mut did_notify = false;
179        for (point, state) in states.iter() {
180            let status = state.status.clone();
181            let generation = state.generation;
182            if self.maybe_update(point, status, generation) && on_change(state, self) {
183                did_notify = true;
184            }
185        }
186        did_notify
187    }
188}
189
190/// Outcome of the mesh-specific polling phase inside `CheckState`.
191pub enum PollResult {
192    /// An error or early condition was handled internally; just reschedule.
193    Reschedule,
194    /// States were polled and processed. `did_notify` is true if at least
195    /// one subscriber/owner notification was sent.
196    Processed { did_notify: bool },
197}
198
199/// Compute the keepalive expiry from `MESH_ORPHAN_TIMEOUT`, or `None` if
200/// the timeout is disabled.
201fn compute_keepalive() -> Option<SystemTime> {
202    hyperactor_config::global::get(MESH_ORPHAN_TIMEOUT).map(|d| SystemTime::now() + d)
203}
204
205/// Subscribe me to updates about a mesh. If a duplicate is subscribed, only a single
206/// message is sent.
207/// Will send None if there are no failures on the mesh periodically. This guarantees
208/// the listener that the controller is still alive. Make sure to filter such events
209/// out as not useful.
210#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
211pub struct Subscribe(pub hyperactor::PortRef<Option<MeshFailure>>);
212wirevalue::register_type!(Subscribe);
213
214/// Unsubscribe me to future updates about a mesh. Should be the same port used in
215/// the Subscribe message.
216#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
217pub struct Unsubscribe(pub hyperactor::PortRef<Option<MeshFailure>>);
218wirevalue::register_type!(Unsubscribe);
219
220/// Query the number of active supervision subscribers on this controller.
221#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
222pub struct GetSubscriberCount(#[binding(include)] pub hyperactor::PortRef<usize>);
223wirevalue::register_type!(GetSubscriberCount);
224
225/// Check state of the actors in the mesh. This is used as a self message to
226/// periodically check.
227/// Stores the next time we expect to start running a check state message.
228/// Used to check for stalls in message handling.
229#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
230pub struct CheckState(pub SystemTime);
231wirevalue::register_type!(CheckState);
232
233declare_attrs! {
234    /// If present in a message header, the message is from an ActorMeshController
235    /// to a subscriber and can be safely dropped if it is returned as undeliverable.
236    pub attr ACTOR_MESH_SUBSCRIBER_MESSAGE: bool;
237}
238
239fn send_subscriber_message(
240    cx: &impl context::Actor,
241    subscriber: &hyperactor::PortRef<Option<MeshFailure>>,
242    message: MeshFailure,
243) {
244    let mut headers = Flattrs::new();
245    headers.set(ACTOR_MESH_SUBSCRIBER_MESSAGE, true);
246    subscriber.post_with_headers(cx, headers, Some(message.clone()));
247    tracing::info!(event = %message, "sent supervision failure message to subscriber {}", subscriber.port_addr());
248}
249
250/// Like send_state_change, but when there was no state change that occurred.
251/// Will send a None message to subscribers, and there is no state to change.
252/// Is not sent to the owner, because the owner is only watching for failures.
253/// Should be called once every so often so subscribers can discern the difference
254/// between "no messages because no errors" and "no messages because controller died".
255/// Without sending these hearbeats, subscribers will assume the mesh is dead.
256fn send_heartbeat(cx: &impl context::Actor, health_state: &HealthState) {
257    tracing::debug!(
258        num_subscribers = health_state.subscribers.len(),
259        "sending heartbeat to subscribers",
260    );
261
262    for subscriber in health_state.subscribers.iter() {
263        let mut headers = Flattrs::new();
264        headers.set(ACTOR_MESH_SUBSCRIBER_MESSAGE, true);
265        subscriber.post_with_headers(cx, headers, None);
266    }
267}
268
269/// Sends a MeshFailure to the owner and subscribers of this mesh,
270/// and changes the health state stored unhealthy_event.
271/// Owners are sent a message only for Failure events, not for Stopped events.
272/// Subscribers are sent both Stopped and Failure events.
273fn send_state_change(
274    cx: &impl context::Actor,
275    rank: usize,
276    event: ActorSupervisionEvent,
277    mesh_name: &ResourceId,
278    is_proc_stopped: bool,
279    health_state: &mut HealthState,
280) {
281    // This does not include the Stopped status, which is a state that occurs when the
282    // user calls stop() on a proc or actor mesh.
283    let is_failed = event.is_error();
284    if is_failed {
285        tracing::warn!(
286            name = "SupervisionEvent",
287            actor_mesh = %mesh_name,
288            %event,
289            "detected supervision error on monitored mesh: name={mesh_name}",
290        );
291    } else {
292        tracing::debug!(
293            name = "SupervisionEvent",
294            actor_mesh = %mesh_name,
295            %event,
296            "detected non-error supervision event on monitored mesh: name={mesh_name}",
297        );
298    }
299
300    let failure_message = MeshFailure {
301        actor_mesh_name: Some(mesh_name.to_string()),
302        event: event.clone(),
303        crashed_ranks: vec![rank],
304    };
305    health_state.crashed_ranks.insert(rank, event.clone());
306    health_state.unhealthy_event = Some(if is_proc_stopped {
307        Unhealthy::StreamClosed(failure_message.clone())
308    } else {
309        Unhealthy::Crashed(failure_message.clone())
310    });
311    // Send a notification to the owning actor of this mesh, if there is one.
312    // Don't send a message to the owner for non-failure events such as "stopped".
313    // Those events are always initiated by the owner, who don't need to be
314    // told that they were stopped.
315    if is_failed && let Some(owner) = &health_state.owner {
316        owner.post(cx, failure_message.clone());
317        tracing::info!(actor_mesh = %mesh_name, %event, "sent supervision failure message to owner {}", owner.port_addr());
318    }
319    // Subscribers get all messages, even for non-failures like Stopped, because
320    // they need to know if the owner stopped the mesh.
321    for subscriber in health_state.subscribers.iter() {
322        send_subscriber_message(cx, subscriber, failure_message.clone());
323    }
324}
325
326fn actor_state_to_supervision_events(
327    state: resource::State<ActorState>,
328) -> (usize, Vec<ActorSupervisionEvent>) {
329    let (rank, actor_id, events) = match state.state {
330        Some(inner) => (
331            inner.create_rank,
332            Some(inner.actor_id),
333            inner.supervision_events.clone(),
334        ),
335        None => (0, None, vec![]),
336    };
337    let events = match state.status {
338        // If the actor was killed, it might not have a Failed status
339        // or supervision events, and it can't tell us which rank
340        resource::Status::NotExist | resource::Status::Stopped | resource::Status::Timeout(_) => {
341            // it was.
342            if !events.is_empty() {
343                events
344            } else {
345                vec![ActorSupervisionEvent::new(
346                    actor_id.expect("actor_id is None"),
347                    None,
348                    ActorStatus::Stopped(
349                        format!(
350                            "actor status is {}; actor may have been killed",
351                            state.status
352                        )
353                        .to_string(),
354                    ),
355                    None,
356                )]
357            }
358        }
359        resource::Status::Failed(_) => events,
360        // All other states are successful.
361        _ => vec![],
362    };
363    (rank, events)
364}
365
366/// Map a process-level [`ProcStatus`] to an actor-level [`ActorStatus`].
367///
368/// When the supervision poll discovers that a process is terminating, this
369/// function decides whether to treat it as a clean stop or a failure.
370/// Notably, [`ProcStatus::Stopping`] (SIGTERM sent, process not yet exited)
371/// is mapped to [`ActorStatus::Stopped`] rather than [`ActorStatus::Failed`]
372/// so that a graceful shutdown in progress does not trigger unhandled
373/// supervision errors.
374fn proc_status_to_actor_status(proc_status: Option<ProcStatus>) -> ActorStatus {
375    match proc_status {
376        Some(ProcStatus::Stopped { exit_code: 0, .. }) => {
377            ActorStatus::Stopped("process exited cleanly".to_string())
378        }
379        Some(ProcStatus::Stopped { exit_code, .. }) => {
380            ActorStatus::Failed(ActorErrorKind::Generic(format!(
381                "the process this actor was running on exited with non-zero code {}",
382                exit_code
383            )))
384        }
385        // Stopping is a transient state during graceful shutdown. Treat it the
386        // same as a clean stop rather than a failure.
387        Some(ProcStatus::Stopping { .. }) => {
388            ActorStatus::Stopped("process is stopping".to_string())
389        }
390        // Conservatively treat lack of status as stopped
391        None => ActorStatus::Stopped("no status received from process".to_string()),
392        Some(status) => ActorStatus::Failed(ActorErrorKind::Generic(format!(
393            "the process this actor was running on failed: {}",
394            status
395        ))),
396    }
397}
398
399fn format_system_time(time: SystemTime) -> String {
400    let datetime: chrono::DateTime<chrono::Local> = time.into();
401    datetime.to_rfc3339()
402}
403
404/// Log a warning and bump `counter` if the supervision loop is running late.
405///
406/// "Late" means the current wall-clock time exceeds `expected_time` by more
407/// than one full poll interval, i.e. 2x the expected period.
408fn check_stall(expected_time: SystemTime, actor_id: &hyperactor::ActorId, counter: &Counter<u64>) {
409    if SystemTime::now()
410        <= expected_time + hyperactor_config::global::get(SUPERVISION_POLL_FREQUENCY)
411    {
412        return;
413    }
414    let expected_time = format_system_time(expected_time);
415    counter.add(
416        1,
417        kv_pairs!("actor_id" => actor_id.to_string(), "expected_time" => expected_time.clone()),
418    );
419    tracing::warn!(
420        %actor_id,
421        "Handler<CheckState> is stalled, expected at {}",
422        expected_time,
423    );
424}
425
426/// Mesh-specific behavior required by the generic `ResourceController`.
427///
428/// Each variant of resource mesh (actor, proc) implements this trait to
429/// provide the details that cannot be shared by the generic controller:
430/// the state type carried in `resource::State<_>`, how to query or stream
431/// that state from the underlying agents, how to stop the resources, and
432/// how to notify observers when the state changes.
433#[async_trait]
434pub trait Controlled: Clone + Debug + Send + Sync + 'static {
435    /// Inner payload carried in `resource::State<Self::StateInner>`.
436    type StateInner: RemoteMessage + Clone + Debug + 'static;
437
438    /// Counter bumped when the supervision loop detects a stall.
439    fn stall_counter() -> &'static Counter<u64>;
440
441    /// The mesh's resource identifier.
442    fn id(&self) -> &ResourceId;
443
444    /// The region of ranks in this mesh.
445    fn region(&self) -> &ndslice::Region;
446
447    /// Subscribe the given port to `StreamState<StateInner>` updates from
448    /// the underlying agents.
449    fn subscribe_to_stream(
450        &self,
451        cx: &impl context::Actor,
452        subscriber: hyperactor::PortRef<resource::State<Self::StateInner>>,
453    ) -> anyhow::Result<()>;
454
455    /// Forward a `WaitRankStatus` message to the underlying agents.
456    fn forward_wait_rank_status(
457        &self,
458        cx: &impl context::Actor,
459        msg: resource::WaitRankStatus,
460    ) -> anyhow::Result<()>;
461
462    /// Mesh-specific polling step for the supervision loop. Implementations
463    /// may do pre-checks (such as the actor mesh's proc-aliveness check)
464    /// before querying rank states; updates to `health_state` happen
465    /// in-place. `supervision_display_name` is used for synthesised
466    /// supervision events (e.g., when a proc dies).
467    async fn poll_states(
468        &self,
469        cx: &impl context::Actor,
470        supervision_display_name: &str,
471        health_state: &mut HealthState,
472    ) -> PollResult;
473
474    /// Process a single streamed or polled state. Updates the health state
475    /// and notifies owner/subscribers as appropriate. Returns `true` if a
476    /// notification was emitted (used to suppress heartbeats).
477    fn process_state(
478        &self,
479        cx: &impl context::Actor,
480        state: resource::State<Self::StateInner>,
481        health_state: &mut HealthState,
482    ) -> bool;
483
484    /// Perform the mesh-specific stop: issue stop messages to the underlying
485    /// agents and, where appropriate, update `health_state` and notify
486    /// subscribers. The caller has already taken the monitor and logged.
487    async fn handle_stop_request(
488        &self,
489        cx: &impl context::Actor,
490        supervision_display_name: &str,
491        reason: String,
492        health_state: &mut HealthState,
493    ) -> anyhow::Result<()>;
494
495    /// Stop this mesh on controller cleanup (when `Stop` was not received
496    /// but the actor is shutting down).
497    async fn cleanup_stop(&self, cx: &impl context::Actor, reason: String) -> anyhow::Result<()>;
498}
499
500/// Generic controller for a mesh of resources. Currently instantiated as
501/// `ActorMeshController<A> = ResourceController<ActorMeshRef<A>>`. All
502/// shared behavior lives here; mesh-specific behavior is delegated through
503/// the `Controlled` trait.
504///
505/// `resource::mesh::Spec<()>` and `resource::mesh::State<()>` (instead of
506/// `Spec<T::Spec>` / `State<T::StateInner>`) are used because the
507/// controller participates in the mesh `resource` protocol only at the
508/// outer layer: callers of `GetState` on the controller want the
509/// per-rank statuses and the mesh-wide status that `resource::mesh::State`
510/// already carries, not the inner `T::StateInner` payload (which is
511/// available rank-by-rank via the `resource::State<T::StateInner>` stream).
512/// The unit type is the explicit "no extra payload" choice.
513#[hyperactor::export(
514    handlers=[
515        Subscribe,
516        Unsubscribe,
517        GetSubscriberCount,
518        CheckState,
519        resource::WaitRankStatus,
520        resource::CreateOrUpdate<resource::mesh::Spec<()>>,
521        resource::GetState<resource::mesh::State<()>>,
522        resource::Stop,
523        resource::State<T::StateInner>,
524    ]
525)]
526pub struct ResourceController<T: Controlled> {
527    mesh: T,
528    /// Supervision display name used in telemetry and fake supervision
529    /// events. If `None`, falls back to `mesh.id()`.
530    supervision_display_name: Option<String>,
531    /// Shared health state for the monitor and responding to queries.
532    health_state: HealthState,
533    /// The monitor which continuously runs in the background to refresh
534    /// state. If None, the controller has stopped monitoring.
535    monitor: Option<()>,
536}
537
538/// Controller for an actor mesh.
539pub type ActorMeshController<A> = ResourceController<ActorMeshRef<A>>;
540
541impl<T: Controlled> ResourceController<T> {
542    /// Create a new controller over the given mesh.
543    pub(crate) fn new(
544        mesh: T,
545        supervision_display_name: Option<String>,
546        owner: Option<hyperactor::PortRef<MeshFailure>>,
547        initial_statuses: ValueMesh<resource::Status>,
548    ) -> Self {
549        Self {
550            mesh,
551            supervision_display_name,
552            health_state: HealthState::new(initial_statuses.iter().collect(), owner),
553            monitor: None,
554        }
555    }
556
557    /// The display name to use for supervision events and telemetry.
558    pub(crate) fn supervision_display_name(&self) -> String {
559        self.supervision_display_name
560            .clone()
561            .unwrap_or_else(|| self.mesh.id().to_string())
562    }
563
564    /// Schedule the next `CheckState` self-message if the monitor is active.
565    ///
566    /// `send_fn` bridges the type gap: the caller passes a closure that
567    /// captures the typed `Instance`/`Context` and calls
568    /// `post_after`.
569    fn schedule_next_check(&self, send_fn: impl FnOnce(CheckState, Duration)) {
570        if self.monitor.is_some() {
571            let delay = hyperactor_config::global::get(SUPERVISION_POLL_FREQUENCY);
572            send_fn(CheckState(SystemTime::now() + delay), delay);
573        }
574    }
575
576    /// Derive the mesh-level status from health state and monitor presence.
577    fn mesh_status(&self) -> resource::Status {
578        if let Some(Unhealthy::Crashed(e)) = &self.health_state.unhealthy_event {
579            resource::Status::Failed(e.to_string())
580        } else if let Some(Unhealthy::StreamClosed(_)) = &self.health_state.unhealthy_event {
581            resource::Status::Stopped
582        } else if self.monitor.is_none() {
583            resource::Status::Stopped
584        } else {
585            resource::Status::Running
586        }
587    }
588
589    /// Build and send the `GetState<resource::mesh::State<()>>` response.
590    fn handle_get_state_msg(
591        &self,
592        cx: &impl context::Actor,
593        message: resource::GetState<resource::mesh::State<()>>,
594    ) -> anyhow::Result<()> {
595        let status = self.mesh_status();
596        let mut statuses = self
597            .health_state
598            .statuses
599            .iter()
600            .map(|(p, (s, _))| (p.clone(), s.clone()))
601            .collect::<Vec<_>>();
602        statuses.sort_by_key(|(p, _)| p.rank());
603        let statuses: ValueMesh<resource::Status> =
604            statuses
605                .into_iter()
606                .map(|(_, s)| s)
607                .collect_mesh::<ValueMesh<_>>(self.mesh.region().clone())?;
608        let state = resource::mesh::State {
609            statuses,
610            state: (),
611        };
612        message.reply.post(
613            cx,
614            resource::State {
615                id: message.id,
616                status,
617                state: Some(state),
618                generation: 0,
619                timestamp: SystemTime::now(),
620            },
621        );
622        Ok(())
623    }
624
625    /// Drop the monitor if every tracked rank has reached a terminal status.
626    fn stop_if_all_terminating(&mut self) {
627        if self.health_state.all_terminating() {
628            self.monitor.take();
629        }
630    }
631
632    async fn handle_check_state(
633        &mut self,
634        cx: &Context<'_, Self>,
635        expected_time: SystemTime,
636    ) -> anyhow::Result<()>
637    where
638        resource::State<T::StateInner>: RemoteMessage,
639    {
640        if self.monitor.is_none() {
641            return Ok(());
642        }
643        check_stall(expected_time, cx.self_addr().id(), T::stall_counter());
644
645        let display = self.supervision_display_name();
646        let result = self
647            .mesh
648            .poll_states(cx, &display, &mut self.health_state)
649            .await;
650
651        match result {
652            PollResult::Reschedule => {
653                self.schedule_next_check(|msg, delay| cx.post_after(cx, msg, delay));
654            }
655            PollResult::Processed { did_notify } => {
656                // Suppress heartbeats once any rank is terminating: the mesh is on
657                // its way down and subscribers will get a real state-change message
658                // for the terminal transition.
659                if !did_notify && !self.health_state.any_terminating() {
660                    send_heartbeat(cx, &self.health_state);
661                }
662                if !self.health_state.all_terminating() {
663                    self.schedule_next_check(|msg, delay| cx.post_after(cx, msg, delay));
664                } else {
665                    self.monitor.take();
666                }
667            }
668        }
669        Ok(())
670    }
671}
672
673impl<T: Controlled> Debug for ResourceController<T> {
674    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675        f.debug_struct("ResourceController")
676            .field("mesh", &self.mesh)
677            .field("health_state", &self.health_state)
678            .field("monitor", &self.monitor)
679            .finish()
680    }
681}
682
683impl<T: Controlled> resource::mesh::Mesh for ResourceController<T> {
684    type Spec = ();
685    type State = ();
686}
687
688#[async_trait]
689impl<T: Controlled> Actor for ResourceController<T>
690where
691    resource::State<T::StateInner>: RemoteMessage,
692{
693    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
694        this.set_system();
695
696        // Subscribe to streaming state updates from the underlying agents so
697        // the controller receives state changes in real time, complementing
698        // the existing polling loop. Avoid binding the handle here: the
699        // controller's exported ports are bound when the mesh installs the
700        // ActorRef after spawn. Binding the same handle twice panics.
701        //
702        // This must happen before starting the monitor so that the first
703        // CheckState does not race the initial StreamState cast.
704        //
705        // TODO(SF, 2026-03-32, T261106175): follow up in hyperactor on bind
706        // semantics here. `cx.port()` plus later actor-ref export currently
707        // hits `bind()` -> `bind_handler_port()` on the same handle, and
708        // `bind_handler_port()` still panics on an already-bound handle. This
709        // workaround uses `attest_handler_port(...)` to avoid the eager
710        // bind, but the longer-term fix is to clarify whether that bind
711        // path should be idempotent and eliminate the need for attestation
712        // here.
713        let subscriber =
714            hyperactor::PortRef::<resource::State<T::StateInner>>::attest_handler_port(
715                &this.self_addr().clone(),
716            )
717            .unsplit();
718        self.mesh.subscribe_to_stream(this, subscriber)?;
719
720        // Start the monitor task.
721        self.monitor = Some(());
722        self.schedule_next_check(|msg, delay| this.post_after(this, msg, delay));
723
724        let owner = if let Some(owner) = &self.health_state.owner {
725            owner.to_string()
726        } else {
727            String::from("None")
728        };
729        tracing::info!(
730            actor_id = %this.self_addr(),
731            %owner,
732            "started resource controller for {}",
733            self.mesh.id()
734        );
735        Ok(())
736    }
737
738    async fn cleanup(
739        &mut self,
740        this: &Instance<Self>,
741        _err: Option<&ActorError>,
742    ) -> Result<(), anyhow::Error> {
743        if self.monitor.take().is_some() {
744            tracing::info!(
745                actor_id = %this.self_addr(),
746                mesh = %self.mesh.id(),
747                "starting cleanup for ResourceController, stopping mesh",
748            );
749            self.mesh
750                .cleanup_stop(this, "resource controller cleanup".to_string())
751                .await?;
752        }
753        Ok(())
754    }
755
756    async fn handle_undeliverable_message(
757        &mut self,
758        cx: &Instance<Self>,
759        mut envelope: Undeliverable<MessageEnvelope>,
760    ) -> Result<(), anyhow::Error> {
761        envelope = update_undeliverable_envelope_for_casting(envelope);
762        let Some(returned) = envelope.as_message() else {
763            return handle_undeliverable_message(cx, envelope);
764        };
765        if let Some(true) = returned.headers().get(ACTOR_MESH_SUBSCRIBER_MESSAGE) {
766            // Remove from the subscriber list (if it existed) so we don't
767            // send to this subscriber again.
768            // NOTE: The only part of the port that is used for equality checks is
769            // the port id, so create a new one just for the comparison.
770            let dest_port_id = returned.dest().clone();
771            let port = hyperactor::PortRef::<Option<MeshFailure>>::attest(dest_port_id);
772            let did_exist = self.health_state.subscribers.remove(&port);
773            if did_exist {
774                tracing::debug!(
775                    actor_id = %cx.self_addr(),
776                    num_subscribers = self.health_state.subscribers.len(),
777                    "ResourceController: removed subscriber {} from mesh controller",
778                    port.port_addr()
779                );
780            }
781            Ok(())
782        } else if returned.headers().get(CAST_ACTOR_MESH_ID).is_some() {
783            // A cast message we sent (e.g. StreamState or KeepaliveGetState)
784            // was returned by the CommActor because it could not be forwarded.
785            // This is expected when the network session is broken. Log and
786            // continue — the supervision polling loop will detect the failure.
787            tracing::warn!(
788                actor_id = %cx.self_addr(),
789                dest = %returned.dest(),
790                "ResourceController: ignoring undeliverable cast message",
791            );
792            Ok(())
793        } else {
794            handle_undeliverable_message(cx, envelope)
795        }
796    }
797}
798
799#[async_trait]
800impl<T: Controlled> Handler<Subscribe> for ResourceController<T>
801where
802    resource::State<T::StateInner>: RemoteMessage,
803{
804    async fn handle(&mut self, cx: &Context<Self>, message: Subscribe) -> anyhow::Result<()> {
805        // If there are any crashed ranks, replay a failure event so the new
806        // subscriber learns about the current health state. We send a single
807        // message with all crashed ranks so the subscriber's filter can check
808        // overlap with its slice region. This avoids the watch-channel
809        // coalescing problem (sending per-rank messages would lose all but
810        // the last one).
811        if let Some(unhealthy) = &self.health_state.unhealthy_event {
812            let msg = match unhealthy {
813                Unhealthy::StreamClosed(msg) | Unhealthy::Crashed(msg) => msg,
814            };
815            let mut replay_msg = msg.clone();
816            replay_msg.crashed_ranks = self.health_state.crashed_ranks.keys().copied().collect();
817            send_subscriber_message(cx, &message.0, replay_msg);
818        }
819        let port_id = message.0.port_addr().clone();
820        if self.health_state.subscribers.insert(message.0) {
821            tracing::debug!(
822                actor_id = %cx.self_addr(),
823                num_subscribers = self.health_state.subscribers.len(),
824                "added subscriber {} to mesh controller",
825                port_id
826            );
827        }
828        Ok(())
829    }
830}
831
832#[async_trait]
833impl<T: Controlled> Handler<Unsubscribe> for ResourceController<T>
834where
835    resource::State<T::StateInner>: RemoteMessage,
836{
837    async fn handle(&mut self, cx: &Context<Self>, message: Unsubscribe) -> anyhow::Result<()> {
838        if self.health_state.subscribers.remove(&message.0) {
839            tracing::debug!(
840                actor_id = %cx.self_addr(),
841                num_subscribers = self.health_state.subscribers.len(),
842                "removed subscriber {} from mesh controller",
843                message.0.port_addr()
844            );
845        }
846        Ok(())
847    }
848}
849
850#[async_trait]
851impl<T: Controlled> Handler<GetSubscriberCount> for ResourceController<T>
852where
853    resource::State<T::StateInner>: RemoteMessage,
854{
855    async fn handle(
856        &mut self,
857        cx: &Context<Self>,
858        message: GetSubscriberCount,
859    ) -> anyhow::Result<()> {
860        message.0.post(cx, self.health_state.subscribers.len());
861        Ok(())
862    }
863}
864
865#[async_trait]
866impl<T: Controlled> Handler<resource::CreateOrUpdate<resource::mesh::Spec<()>>>
867    for ResourceController<T>
868where
869    resource::State<T::StateInner>: RemoteMessage,
870{
871    /// Currently a no-op as there's nothing to create or update, but allows
872    /// `ResourceController` to implement the resource mesh behavior.
873    async fn handle(
874        &mut self,
875        _cx: &Context<Self>,
876        _message: resource::CreateOrUpdate<resource::mesh::Spec<()>>,
877    ) -> anyhow::Result<()> {
878        Ok(())
879    }
880}
881
882#[async_trait]
883impl<T: Controlled> Handler<resource::GetState<resource::mesh::State<()>>> for ResourceController<T>
884where
885    resource::State<T::StateInner>: RemoteMessage,
886{
887    async fn handle(
888        &mut self,
889        cx: &Context<Self>,
890        message: resource::GetState<resource::mesh::State<()>>,
891    ) -> anyhow::Result<()> {
892        self.handle_get_state_msg(cx, message)
893    }
894}
895
896#[async_trait]
897impl<T: Controlled> Handler<resource::Stop> for ResourceController<T>
898where
899    resource::State<T::StateInner>: RemoteMessage,
900{
901    async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
902        let mesh_name = self.mesh.id().clone();
903        tracing::info!(
904            name = "ResourceControllerStatus",
905            %mesh_name,
906            reason = %message.reason,
907            "stopping mesh"
908        );
909        if self.monitor.take().is_none() {
910            tracing::debug!(
911                actor_id = %cx.self_addr(),
912                %mesh_name,
913                "duplicate stop request, mesh is already stopped",
914            );
915            return Ok(());
916        }
917        let display = self.supervision_display_name();
918        self.mesh
919            .handle_stop_request(cx, &display, message.reason, &mut self.health_state)
920            .await
921    }
922}
923
924#[async_trait]
925impl<T: Controlled> Handler<resource::WaitRankStatus> for ResourceController<T>
926where
927    resource::State<T::StateInner>: RemoteMessage,
928{
929    /// Forward WaitRankStatus to the underlying agents. Each agent replies
930    /// directly to the caller's accumulator port when its resource reaches
931    /// the requested status.
932    async fn handle(
933        &mut self,
934        cx: &Context<Self>,
935        msg: resource::WaitRankStatus,
936    ) -> anyhow::Result<()> {
937        self.mesh.forward_wait_rank_status(cx, msg)
938    }
939}
940
941#[async_trait]
942impl<T: Controlled> Handler<CheckState> for ResourceController<T>
943where
944    resource::State<T::StateInner>: RemoteMessage,
945{
946    async fn handle(
947        &mut self,
948        cx: &Context<Self>,
949        CheckState(expected_time): CheckState,
950    ) -> Result<(), anyhow::Error> {
951        self.handle_check_state(cx, expected_time).await
952    }
953}
954
955#[async_trait]
956impl<T: Controlled> Handler<resource::State<T::StateInner>> for ResourceController<T>
957where
958    resource::State<T::StateInner>: RemoteMessage,
959{
960    async fn handle(
961        &mut self,
962        cx: &Context<Self>,
963        state: resource::State<T::StateInner>,
964    ) -> anyhow::Result<()> {
965        self.mesh.process_state(cx, state, &mut self.health_state);
966        self.stop_if_all_terminating();
967        Ok(())
968    }
969}
970
971/// `Controlled` implementation for an actor mesh.
972#[async_trait]
973impl<A: Referable> Controlled for ActorMeshRef<A> {
974    type StateInner = ActorState;
975
976    fn stall_counter() -> &'static Counter<u64> {
977        &ACTOR_MESH_CONTROLLER_SUPERVISION_STALLS
978    }
979
980    fn id(&self) -> &ResourceId {
981        ActorMeshRef::id(self).resource_id()
982    }
983
984    fn region(&self) -> &ndslice::Region {
985        ndslice::view::Ranked::region(self)
986    }
987
988    fn subscribe_to_stream(
989        &self,
990        cx: &impl context::Actor,
991        subscriber: hyperactor::PortRef<resource::State<ActorState>>,
992    ) -> anyhow::Result<()> {
993        self.proc_mesh().agent_mesh().cast(
994            cx,
995            resource::StreamState::<ActorState> {
996                id: ActorMeshRef::id(self).resource_id().clone(),
997                subscriber,
998            },
999        )?;
1000        Ok(())
1001    }
1002
1003    fn forward_wait_rank_status(
1004        &self,
1005        cx: &impl context::Actor,
1006        msg: resource::WaitRankStatus,
1007    ) -> anyhow::Result<()> {
1008        self.proc_mesh().agent_mesh().cast(cx, msg)?;
1009        Ok(())
1010    }
1011
1012    async fn poll_states(
1013        &self,
1014        cx: &impl context::Actor,
1015        supervision_display_name: &str,
1016        health_state: &mut HealthState,
1017    ) -> PollResult {
1018        let mesh_name = Controlled::id(self);
1019
1020        // Actor-specific: first check if the proc mesh is dead before
1021        // trying to query their agents.
1022        let proc_states = self.proc_mesh().proc_states(cx, None).await;
1023        if let Err(e) = proc_states {
1024            send_state_change(
1025                cx,
1026                0,
1027                ActorSupervisionEvent::new(
1028                    cx.instance().self_addr().clone(),
1029                    None,
1030                    ActorStatus::generic_failure(format!(
1031                        "unable to query for proc states: {:?}",
1032                        e
1033                    )),
1034                    None,
1035                ),
1036                mesh_name,
1037                false,
1038                health_state,
1039            );
1040            return PollResult::Reschedule;
1041        }
1042        if let Some(proc_states) = proc_states.unwrap() {
1043            // Check if the proc mesh is still alive.
1044            if let Some((point, state)) = proc_states
1045                .iter()
1046                .find(|(_rank, state)| state.status.is_terminating())
1047            {
1048                // TODO: allow "actor supervision event" to be general, and
1049                // make the proc failure the cause. It is a hack to try to determine
1050                // the correct status based on process exit status.
1051                let actor_status =
1052                    proc_status_to_actor_status(state.state.and_then(|s| s.proc_status));
1053                let display = crate::actor_display_name(supervision_display_name, &point);
1054                send_state_change(
1055                    cx,
1056                    point.rank(),
1057                    ActorSupervisionEvent::new(
1058                        // Attribute this to the monitored actor, even if the underlying
1059                        // cause is a proc_failure. We propagate the cause explicitly.
1060                        self.get(point.rank()).unwrap().actor_addr().clone(),
1061                        Some(display),
1062                        actor_status,
1063                        None,
1064                    ),
1065                    mesh_name,
1066                    true,
1067                    health_state,
1068                );
1069                return PollResult::Reschedule;
1070            }
1071        }
1072
1073        // Query resource states with keepalive.
1074        let actor_states = self
1075            .actor_states_with_keepalive(cx, compute_keepalive())
1076            .await;
1077        match actor_states {
1078            Err(e) => {
1079                send_state_change(
1080                    cx,
1081                    0,
1082                    ActorSupervisionEvent::new(
1083                        cx.instance().self_addr().clone(),
1084                        Some(supervision_display_name.to_string()),
1085                        ActorStatus::generic_failure(format!(
1086                            "unable to query for actor states: {:?}",
1087                            e
1088                        )),
1089                        None,
1090                    ),
1091                    mesh_name,
1092                    false,
1093                    health_state,
1094                );
1095                PollResult::Reschedule
1096            }
1097            Ok(states) => {
1098                let did_notify =
1099                    health_state.apply_updates_and_notify(&states, |state, health_state| {
1100                        let (rank, events) = actor_state_to_supervision_events(state);
1101                        if events.is_empty() {
1102                            return false;
1103                        }
1104                        send_state_change(
1105                            cx,
1106                            rank,
1107                            events[0].clone(),
1108                            mesh_name,
1109                            false,
1110                            health_state,
1111                        );
1112                        true
1113                    });
1114                PollResult::Processed { did_notify }
1115            }
1116        }
1117    }
1118
1119    fn process_state(
1120        &self,
1121        cx: &impl context::Actor,
1122        state: resource::State<ActorState>,
1123        health_state: &mut HealthState,
1124    ) -> bool {
1125        let (rank, events) = actor_state_to_supervision_events(state.clone());
1126        let Ok(point) = Controlled::region(self).extent().point_of_rank(rank) else {
1127            return false;
1128        };
1129
1130        let changed = health_state.maybe_update(point, state.status, state.generation);
1131
1132        if changed && !events.is_empty() {
1133            send_state_change(
1134                cx,
1135                rank,
1136                events[0].clone(),
1137                Controlled::id(self),
1138                false,
1139                health_state,
1140            );
1141            true
1142        } else {
1143            false
1144        }
1145    }
1146
1147    async fn handle_stop_request(
1148        &self,
1149        cx: &impl context::Actor,
1150        _supervision_display_name: &str,
1151        reason: String,
1152        health_state: &mut HealthState,
1153    ) -> anyhow::Result<()> {
1154        let mesh_name = Controlled::id(self);
1155        tracing::info!(
1156            actor_id = %cx.instance().self_addr(),
1157            actor_mesh = %mesh_name,
1158            "forwarding stop request from ActorMeshController to proc mesh"
1159        );
1160
1161        // Let the client know that the controller has stopped. Since the monitor
1162        // is cancelled, it will not alert the owner or the subscribers.
1163        // We use a placeholder rank to get an actor id, but really there should
1164        // be a stop event for every rank in the mesh. Since every rank has the
1165        // same owner, we assume the rank doesn't matter, and the owner can just
1166        // assume the stop happened on all actors.
1167        let rank = 0usize;
1168        let event = ActorSupervisionEvent::new(
1169            self.get(rank)
1170                .expect("mesh must have at least one rank")
1171                .actor_addr()
1172                .clone(),
1173            None,
1174            ActorStatus::Stopped("ActorMeshController received explicit stop request".to_string()),
1175            None,
1176        );
1177        let failure_message = MeshFailure {
1178            actor_mesh_name: Some(mesh_name.to_string()),
1179            event,
1180            crashed_ranks: vec![],
1181        };
1182        health_state.unhealthy_event = Some(Unhealthy::StreamClosed(failure_message.clone()));
1183        // We don't send a message to the owner on stops, because only the owner
1184        // can request a stop. We just send to subscribers instead, as they did
1185        // not request the stop themselves.
1186        for subscriber in health_state.subscribers.iter() {
1187            send_subscriber_message(cx, subscriber, failure_message.clone());
1188        }
1189
1190        // max_rank and extent are only needed for the deprecated RankedValues.
1191        // TODO: add cmp::Ord to Point for a max() impl.
1192        let max_rank = health_state.statuses.keys().map(|p| p.rank()).max();
1193        let extent = health_state
1194            .statuses
1195            .keys()
1196            .next()
1197            .map(|p| p.extent().clone());
1198
1199        // Cannot use "ActorMesh::stop" as it tries to message the controller.
1200        let result = self
1201            .proc_mesh()
1202            .stop_actor_by_id(cx, ActorMeshRef::id(self).clone(), reason)
1203            .await;
1204
1205        match result {
1206            Ok(statuses) => {
1207                // All stops successful, set actor status on health state.
1208                for (rank, status) in statuses.iter() {
1209                    health_state
1210                        .statuses
1211                        .entry(rank)
1212                        .and_modify(move |s| *s = (status, u64::MAX));
1213                }
1214            }
1215            Err(crate::Error::ActorStopError { statuses }) => {
1216                if let Some(max_rank) = max_rank {
1217                    let extent = extent.expect("no actors in mesh");
1218                    for (rank, status) in statuses.materialized_iter(max_rank).enumerate() {
1219                        *health_state
1220                            .statuses
1221                            .get_mut(&extent.point_of_rank(rank).expect("illegal rank"))
1222                            .unwrap() = (status.clone(), u64::MAX);
1223                    }
1224                }
1225            }
1226            Err(e) => {
1227                return Err(e.into());
1228            }
1229        }
1230
1231        tracing::info!(
1232            actor_id = %cx.instance().self_addr(),
1233            actor_mesh = %mesh_name,
1234            "stopped mesh"
1235        );
1236        Ok(())
1237    }
1238
1239    async fn cleanup_stop(&self, cx: &impl context::Actor, reason: String) -> anyhow::Result<()> {
1240        self.proc_mesh()
1241            .stop_actor_by_id(cx, ActorMeshRef::id(self).clone(), reason)
1242            .await?;
1243        Ok(())
1244    }
1245}
1246
1247/// Controller for a proc mesh.
1248pub(crate) type ProcMeshController = ResourceController<ProcMeshRef>;
1249
1250/// `Controlled` implementation for a proc mesh.
1251#[async_trait]
1252impl Controlled for ProcMeshRef {
1253    type StateInner = crate::host_mesh::host_agent::ProcState;
1254
1255    fn stall_counter() -> &'static Counter<u64> {
1256        &PROC_MESH_CONTROLLER_SUPERVISION_STALLS
1257    }
1258
1259    fn id(&self) -> &ResourceId {
1260        ProcMeshRef::id(self).resource_id()
1261    }
1262
1263    fn region(&self) -> &ndslice::Region {
1264        ndslice::view::Ranked::region(self)
1265    }
1266
1267    fn subscribe_to_stream(
1268        &self,
1269        cx: &impl context::Actor,
1270        subscriber: hyperactor::PortRef<resource::State<Self::StateInner>>,
1271    ) -> anyhow::Result<()> {
1272        // Send one StreamState per proc to its host agent.
1273        for proc_id in self.proc_ids() {
1274            let proc_resource_id = ResourceId::new(proc_id.uid().clone(), proc_id.label().cloned());
1275            let host = crate::host_mesh::HostRef(proc_id.addr().clone());
1276            host.mesh_agent().post(
1277                cx,
1278                resource::StreamState::<Self::StateInner> {
1279                    id: proc_resource_id,
1280                    subscriber: subscriber.clone(),
1281                },
1282            );
1283        }
1284        Ok(())
1285    }
1286
1287    fn forward_wait_rank_status(
1288        &self,
1289        cx: &impl context::Actor,
1290        msg: resource::WaitRankStatus,
1291    ) -> anyhow::Result<()> {
1292        for proc_id in self.proc_ids() {
1293            let host = crate::host_mesh::HostRef(proc_id.addr().clone());
1294            host.mesh_agent().post(cx, msg.clone());
1295        }
1296        Ok(())
1297    }
1298
1299    async fn poll_states(
1300        &self,
1301        cx: &impl context::Actor,
1302        supervision_display_name: &str,
1303        health_state: &mut HealthState,
1304    ) -> PollResult {
1305        let mesh_name = Controlled::id(self);
1306
1307        let proc_states = self.proc_states(cx, compute_keepalive()).await;
1308        match proc_states {
1309            Err(e) => {
1310                send_state_change(
1311                    cx,
1312                    0,
1313                    ActorSupervisionEvent::new(
1314                        cx.instance().self_addr().clone(),
1315                        Some(supervision_display_name.to_string()),
1316                        ActorStatus::generic_failure(format!(
1317                            "unable to query for proc states: {:?}",
1318                            e
1319                        )),
1320                        None,
1321                    ),
1322                    mesh_name,
1323                    false,
1324                    health_state,
1325                );
1326                PollResult::Reschedule
1327            }
1328            Ok(None) => PollResult::Processed { did_notify: false },
1329            Ok(Some(states)) => {
1330                let did_notify =
1331                    health_state.apply_updates_and_notify(&states, |state, health_state| {
1332                        self.notify_proc_state_change(
1333                            cx,
1334                            supervision_display_name,
1335                            state,
1336                            health_state,
1337                        )
1338                    });
1339                PollResult::Processed { did_notify }
1340            }
1341        }
1342    }
1343
1344    fn process_state(
1345        &self,
1346        cx: &impl context::Actor,
1347        state: resource::State<Self::StateInner>,
1348        health_state: &mut HealthState,
1349    ) -> bool {
1350        let Ok(point) = Controlled::region(self).extent().point_of_rank(
1351            state
1352                .state
1353                .as_ref()
1354                .map(|s| s.create_rank)
1355                .unwrap_or(usize::MAX),
1356        ) else {
1357            return false;
1358        };
1359        let changed = health_state.maybe_update(point, state.status.clone(), state.generation);
1360        if !changed {
1361            return false;
1362        }
1363        let display = Controlled::id(self).to_string();
1364        self.notify_proc_state_change(cx, &display, state, health_state)
1365    }
1366
1367    async fn handle_stop_request(
1368        &self,
1369        cx: &impl context::Actor,
1370        _supervision_display_name: &str,
1371        reason: String,
1372        health_state: &mut HealthState,
1373    ) -> anyhow::Result<()> {
1374        let mesh_name = Controlled::id(self);
1375        tracing::info!(
1376            actor_id = %cx.instance().self_addr(),
1377            proc_mesh = %mesh_name,
1378            "ProcMeshController stopping proc mesh"
1379        );
1380        // Marker so subscribers know the mesh is being torn down on request.
1381        let event = ActorSupervisionEvent::new(
1382            cx.instance().self_addr().clone(),
1383            None,
1384            ActorStatus::Stopped("ProcMeshController received explicit stop request".to_string()),
1385            None,
1386        );
1387        let failure_message = MeshFailure {
1388            actor_mesh_name: Some(mesh_name.to_string()),
1389            event,
1390            crashed_ranks: vec![],
1391        };
1392        health_state.unhealthy_event = Some(Unhealthy::StreamClosed(failure_message.clone()));
1393        for subscriber in health_state.subscribers.iter() {
1394            send_subscriber_message(cx, subscriber, failure_message.clone());
1395        }
1396
1397        let names = self.proc_ids().collect::<Vec<hyperactor::ProcAddr>>();
1398        let region = Ranked::region(self).clone();
1399        let Some(hosts) = self.hosts() else {
1400            return Ok(());
1401        };
1402        // stop_proc_mesh waits for every rank to reach a terminating state
1403        // before returning Ok, so we can apply its returned StatusMesh
1404        // verbatim. On error we still got per-rank statuses for whatever
1405        // ranks the host agents reported on; apply those too so health
1406        // state stays as accurate as we can make it.
1407        let max_rank = health_state.statuses.keys().map(|p| p.rank()).max();
1408        let extent = health_state
1409            .statuses
1410            .keys()
1411            .next()
1412            .map(|p| p.extent().clone());
1413        match hosts
1414            .stop_proc_mesh(cx, self.id(), names, region, reason)
1415            .await
1416        {
1417            Ok(statuses) => {
1418                for (rank, status) in statuses.iter() {
1419                    health_state
1420                        .statuses
1421                        .entry(rank)
1422                        .and_modify(move |s| *s = (status, u64::MAX));
1423                }
1424                Ok(())
1425            }
1426            Err(crate::Error::ProcMeshStopError { statuses }) => {
1427                if let (Some(max_rank), Some(extent)) = (max_rank, extent) {
1428                    for (rank, status) in statuses.materialized_iter(max_rank).enumerate() {
1429                        if let Ok(point) = extent.point_of_rank(rank) {
1430                            health_state
1431                                .statuses
1432                                .entry(point)
1433                                .and_modify(|s| *s = (status.clone(), u64::MAX));
1434                        }
1435                    }
1436                }
1437                Err(crate::Error::ProcMeshStopError { statuses }.into())
1438            }
1439            Err(e) => Err(e.into()),
1440        }
1441    }
1442
1443    async fn cleanup_stop(&self, cx: &impl context::Actor, reason: String) -> anyhow::Result<()> {
1444        let names = self.proc_ids().collect::<Vec<hyperactor::ProcAddr>>();
1445        let region = Ranked::region(self).clone();
1446        if let Some(hosts) = self.hosts() {
1447            hosts
1448                .stop_proc_mesh(cx, self.id(), names, region, reason)
1449                .await?;
1450        }
1451        Ok(())
1452    }
1453}
1454
1455impl ProcMeshRef {
1456    /// Translate a polled or streamed `State<ProcState>` into a supervision
1457    /// event on this proc-mesh controller. Returns `true` if a notification
1458    /// was sent (which suppresses the heartbeat path).
1459    fn notify_proc_state_change(
1460        &self,
1461        cx: &impl context::Actor,
1462        supervision_display_name: &str,
1463        state: resource::State<crate::host_mesh::host_agent::ProcState>,
1464        health_state: &mut HealthState,
1465    ) -> bool {
1466        let create_rank = state.state.as_ref().map(|s| s.create_rank);
1467        let actor_status = proc_status_to_actor_status(state.state.and_then(|s| s.proc_status));
1468        let event = ActorSupervisionEvent::new(
1469            cx.instance().self_addr().clone(),
1470            Some(supervision_display_name.to_string()),
1471            actor_status,
1472            None,
1473        );
1474        let rank = create_rank
1475            .and_then(|r| {
1476                ndslice::view::Ranked::region(self)
1477                    .extent()
1478                    .point_of_rank(r)
1479                    .ok()
1480            })
1481            .map(|p| p.rank())
1482            .unwrap_or(0);
1483        send_state_change(cx, rank, event, Controlled::id(self), true, health_state);
1484        true
1485    }
1486}
1487
1488#[cfg(test)]
1489mod tests {
1490    use std::ops::Deref;
1491    use std::time::Duration;
1492
1493    use hyperactor::actor::ActorStatus;
1494    use hyperactor::id::Label;
1495    use ndslice::Extent;
1496    use ndslice::ViewExt;
1497
1498    #[cfg(fbcode_build)]
1499    use super::SUPERVISION_POLL_FREQUENCY;
1500    use super::proc_status_to_actor_status;
1501    use crate::ActorMesh;
1502    use crate::bootstrap::ProcStatus;
1503    #[cfg(fbcode_build)]
1504    use crate::host_mesh::PROC_SPAWN_MAX_IDLE;
1505    use crate::mesh_id::ActorMeshId;
1506    #[cfg(fbcode_build)]
1507    use crate::mesh_id::HostMeshId;
1508    use crate::proc_agent::MESH_ORPHAN_TIMEOUT;
1509    use crate::resource;
1510    #[cfg(fbcode_build)]
1511    use crate::supervision::MeshFailure;
1512    use crate::test_utils::local_host_mesh;
1513    use crate::testactor;
1514    use crate::testing;
1515
1516    /// Wraps a host mesh's shutdown guard and the spawned host child
1517    /// processes so tests can simulate an unclean host crash by killing
1518    /// the children directly rather than asking an in-mesh actor to
1519    /// `process::exit`, which can also tear down the test binary.
1520    #[cfg(fbcode_build)]
1521    struct TestHostMesh {
1522        guard: crate::host_mesh::HostMeshShutdownGuard,
1523        children: Vec<tokio::process::Child>,
1524    }
1525
1526    #[cfg(fbcode_build)]
1527    impl TestHostMesh {
1528        async fn kill_hosts(&mut self) {
1529            for child in &mut self.children {
1530                let _ = child.start_kill();
1531                let _ = child.wait().await;
1532            }
1533            self.children.clear();
1534        }
1535    }
1536
1537    #[cfg(fbcode_build)]
1538    impl std::ops::Deref for TestHostMesh {
1539        type Target = crate::host_mesh::HostMeshShutdownGuard;
1540
1541        fn deref(&self) -> &Self::Target {
1542            &self.guard
1543        }
1544    }
1545
1546    #[cfg(fbcode_build)]
1547    impl std::ops::DerefMut for TestHostMesh {
1548        fn deref_mut(&mut self) -> &mut Self::Target {
1549            &mut self.guard
1550        }
1551    }
1552
1553    /// Verify that actors spawned without a controller are cleaned up
1554    /// when their keepalive expiry lapses. We:
1555    ///   1. Enable the orphan timeout on the `ProcMeshAgent`.
1556    ///   2. Spawn actors as *system actors* (no `ActorMeshController`).
1557    ///   3. Send a single keepalive with a short expiry time.
1558    ///   4. Wait for the expiry to pass and `SelfCheck` to fire.
1559    ///   5. Assert that the actors are now stopped.
1560    #[tokio::test]
1561    async fn test_orphaned_actors_are_cleaned_up() {
1562        let config = hyperactor_config::global::lock();
1563        // Short orphan timeout so SelfCheck fires frequently.
1564        let _orphan = config.override_key(MESH_ORPHAN_TIMEOUT, Some(Duration::from_secs(1)));
1565
1566        let instance = testing::instance();
1567        let host_mesh = local_host_mesh(2).await;
1568        let proc_mesh = host_mesh
1569            .spawn(instance, "test", Extent::unity(), None, None)
1570            .await
1571            .unwrap();
1572
1573        let actor_name = ActorMeshId::instance(Label::new("orphan_test").unwrap());
1574        // Spawn as a system actor so no controller is created. This lets us
1575        // control keepalive messages directly without the controller
1576        // interfering.
1577        let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
1578            .spawn_with_name(instance, actor_name.clone(), &(), None, true)
1579            .await
1580            .unwrap();
1581        assert!(
1582            actor_mesh.deref().extent().num_ranks() > 0,
1583            "should have spawned at least one actor"
1584        );
1585
1586        // Send a keepalive with a short expiry. This is what the
1587        // ActorMeshController would normally do on each supervision poll.
1588        let states = proc_mesh
1589            .actor_states_with_keepalive(
1590                instance,
1591                actor_name.clone(),
1592                Some(std::time::SystemTime::now() + Duration::from_secs(2)),
1593            )
1594            .await
1595            .unwrap();
1596        // All actors should be running right now.
1597        for state in states.values() {
1598            assert_eq!(
1599                state.status,
1600                resource::Status::Running,
1601                "actor should be running before expiry"
1602            );
1603        }
1604
1605        // Poll until all actors are stopped, rather than sleeping a
1606        // fixed duration. The expiry is 2s and SelfCheck fires every 1s,
1607        // so this should converge quickly, but we allow a generous timeout
1608        // for slow CI environments.
1609        let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
1610        loop {
1611            let states = proc_mesh
1612                .actor_states(instance, actor_name.clone())
1613                .await
1614                .unwrap();
1615            if states
1616                .values()
1617                .all(|s| s.status == resource::Status::Stopped)
1618            {
1619                break;
1620            }
1621            assert!(
1622                tokio::time::Instant::now() < deadline,
1623                "timed out waiting for actors to be stopped after keepalive expiry"
1624            );
1625            tokio::time::sleep(Duration::from_millis(200)).await;
1626        }
1627    }
1628
1629    /// Create a multi-process host mesh that propagates the current
1630    /// process's config overrides to child processes via Bootstrap.
1631    #[cfg(fbcode_build)]
1632    async fn host_mesh_with_config(n: usize) -> TestHostMesh {
1633        use hyperactor::channel::ChannelTransport;
1634        use tokio::process::Command;
1635
1636        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1637        let mut host_addrs = vec![];
1638        let mut children = Vec::new();
1639        for _ in 0..n {
1640            host_addrs.push(ChannelTransport::Unix.any());
1641        }
1642
1643        for host in host_addrs.iter() {
1644            let mut cmd = Command::new(program.clone());
1645            let boot = crate::Bootstrap::Host {
1646                addr: host.clone(),
1647                command: None,
1648                config: Some(hyperactor_config::global::attrs()),
1649                exit_on_shutdown: false,
1650            };
1651            boot.to_env(&mut cmd);
1652            cmd.kill_on_drop(false);
1653            // SAFETY: pre_exec sets PR_SET_PDEATHSIG so the child is
1654            // cleaned up if the parent (test) process dies.
1655            unsafe {
1656                cmd.pre_exec(crate::bootstrap::install_pdeathsig_kill);
1657            }
1658            children.push(cmd.spawn().unwrap());
1659        }
1660
1661        let host_mesh = crate::HostMeshRef::from_hosts(
1662            HostMeshId::instance(Label::new("test").unwrap()),
1663            host_addrs,
1664        );
1665        TestHostMesh {
1666            guard: crate::host_mesh::HostMesh::take(host_mesh).shutdown_guard(),
1667            children,
1668        }
1669    }
1670
1671    /// Verify that actors are cleaned up via the orphan timeout when the
1672    /// `ActorMeshController`'s host process crashes. Unlike the system-actor
1673    /// test above, this spawns actors through a real controller (via
1674    /// `WrapperActor`) and then kills the controller's host process
1675    /// uncleanly. The agents on the surviving proc mesh detect the expired
1676    /// keepalive and stop the actors.
1677    #[tokio::test]
1678    #[cfg(fbcode_build)]
1679    async fn test_orphaned_actors_cleaned_up_on_controller_crash() {
1680        let config = hyperactor_config::global::lock();
1681        let _orphan = config.override_key(MESH_ORPHAN_TIMEOUT, Some(Duration::from_secs(2)));
1682        let _poll = config.override_key(SUPERVISION_POLL_FREQUENCY, Duration::from_secs(1));
1683        let _proc_spawn = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(60));
1684        let _host_spawn = config.override_key(
1685            hyperactor::config::HOST_SPAWN_READY_TIMEOUT,
1686            Duration::from_secs(60),
1687        );
1688
1689        let instance = testing::instance();
1690        let num_replicas = 1;
1691
1692        // Host mesh for the test actors (these survive the crash).
1693        // host_mesh_with_config propagates config overrides to child
1694        // processes via Bootstrap, so agents boot with
1695        // MESH_ORPHAN_TIMEOUT=2s and start the SelfCheck loop.
1696        let mut actor_hm = host_mesh_with_config(num_replicas).await;
1697        let actor_proc_mesh = actor_hm
1698            .spawn(instance, "actors", Extent::unity(), None, None)
1699            .await
1700            .unwrap();
1701
1702        // Host mesh for the wrapper + controller (will be killed).
1703        let mut controller_hm = host_mesh_with_config(1).await;
1704        let controller_proc_mesh = controller_hm
1705            .spawn(instance, "controller", Extent::unity(), None, None)
1706            .await
1707            .unwrap();
1708
1709        let child_name = ActorMeshId::instance(Label::new("orphan_child").unwrap());
1710
1711        // Supervision port required by WrapperActor params.
1712        let (supervision_port, _supervision_receiver) = instance.open_port::<MeshFailure>();
1713        let supervisor = supervision_port.bind();
1714
1715        // Spawn WrapperActor on controller_proc_mesh. Its init() spawns
1716        // ActorMesh<TestActor> on actor_proc_mesh with a real
1717        // ActorMeshController co-located on the controller's process.
1718        let _wrapper_mesh: ActorMesh<testactor::WrapperActor> = controller_proc_mesh
1719            .spawn(
1720                instance,
1721                "wrapper",
1722                &(
1723                    actor_proc_mesh.deref().clone(),
1724                    supervisor,
1725                    child_name.clone(),
1726                ),
1727            )
1728            .await
1729            .unwrap();
1730
1731        // Give the controller time to run at least one CheckState cycle
1732        // (polling every 1s). This is what sends `KeepaliveGetState` to
1733        // each agent, and is what arms the agent's `expiry_time` so the
1734        // agent's `SelfCheck` reaper can cull the actors after the
1735        // controller dies. Polling for `Running` is not enough: actors
1736        // reach `Running` at spawn time before the controller's first
1737        // poll, and if we kill the controller in that window the agents
1738        // never received a keepalive and the orphan timeout never trips.
1739        tokio::time::sleep(Duration::from_secs(3)).await;
1740        let states = actor_proc_mesh
1741            .actor_states(instance, child_name.clone())
1742            .await
1743            .unwrap();
1744        for state in states.values() {
1745            assert_eq!(
1746                state.status,
1747                resource::Status::Running,
1748                "actor should be running before controller crash"
1749            );
1750        }
1751
1752        // Kill the controller's host process uncleanly. The TestActors on
1753        // actor_proc_mesh survive. Killing the host (rather than asking the
1754        // wrapper actor to `process::exit`) is critical: the wrapper runs
1755        // in this same test binary's address space when the host mesh is
1756        // co-located, so an in-process exit would also tear down the test
1757        // runner.
1758        controller_hm.kill_hosts().await;
1759
1760        // Poll until all actors are stopped via the orphan timeout. The
1761        // configured timeout is 2s and `SelfCheck` fires every 2s, so this
1762        // converges quickly; allow generous slack for slow CI environments.
1763        let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
1764        loop {
1765            let states = actor_proc_mesh
1766                .actor_states(instance, child_name.clone())
1767                .await
1768                .unwrap();
1769            if states
1770                .values()
1771                .all(|s| s.status == resource::Status::Stopped)
1772            {
1773                break;
1774            }
1775            assert!(
1776                tokio::time::Instant::now() < deadline,
1777                "timed out waiting for actors to be stopped after controller crash and orphan timeout"
1778            );
1779            tokio::time::sleep(Duration::from_millis(200)).await;
1780        }
1781
1782        let _ = actor_hm.shutdown(instance).await;
1783    }
1784
1785    #[test]
1786    fn test_proc_status_to_actor_status_stopped_cleanly() {
1787        let status = proc_status_to_actor_status(Some(ProcStatus::Stopped {
1788            exit_code: 0,
1789            stderr_tail: vec![],
1790        }));
1791        assert!(
1792            matches!(status, ActorStatus::Stopped(ref msg) if msg.contains("cleanly")),
1793            "expected Stopped, got {:?}",
1794            status
1795        );
1796    }
1797
1798    #[test]
1799    fn test_proc_status_to_actor_status_nonzero_exit() {
1800        let status = proc_status_to_actor_status(Some(ProcStatus::Stopped {
1801            exit_code: 1,
1802            stderr_tail: vec![],
1803        }));
1804        assert!(
1805            matches!(status, ActorStatus::Failed(_)),
1806            "expected Failed, got {:?}",
1807            status
1808        );
1809    }
1810
1811    #[test]
1812    fn test_proc_status_to_actor_status_stopping_is_not_a_failure() {
1813        let status = proc_status_to_actor_status(Some(ProcStatus::Stopping {
1814            started_at: std::time::SystemTime::now(),
1815        }));
1816        assert!(
1817            matches!(status, ActorStatus::Stopped(ref msg) if msg.contains("stopping")),
1818            "expected Stopped, got {:?}",
1819            status
1820        );
1821    }
1822
1823    #[test]
1824    fn test_proc_status_to_actor_status_none() {
1825        let status = proc_status_to_actor_status(None);
1826        assert!(
1827            matches!(status, ActorStatus::Stopped(_)),
1828            "expected Stopped, got {:?}",
1829            status
1830        );
1831    }
1832
1833    #[test]
1834    fn test_proc_status_to_actor_status_killed() {
1835        let status = proc_status_to_actor_status(Some(ProcStatus::Killed {
1836            signal: 9,
1837            core_dumped: false,
1838        }));
1839        assert!(
1840            matches!(status, ActorStatus::Failed(_)),
1841            "expected Failed, got {:?}",
1842            status
1843        );
1844    }
1845
1846    #[test]
1847    fn test_proc_status_to_actor_status_failed() {
1848        let status = proc_status_to_actor_status(Some(ProcStatus::Failed {
1849            reason: "oom".to_string(),
1850        }));
1851        assert!(
1852            matches!(status, ActorStatus::Failed(_)),
1853            "expected Failed, got {:?}",
1854            status
1855        );
1856    }
1857}