hyperactor_mesh/
mesh_controller.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::collections::HashSet;
11use std::fmt::Debug;
12
13use async_trait::async_trait;
14use hyperactor::Actor;
15use hyperactor::Bind;
16use hyperactor::Context;
17use hyperactor::Handler;
18use hyperactor::Instance;
19use hyperactor::Unbind;
20use hyperactor::actor::ActorError;
21use hyperactor::actor::ActorErrorKind;
22use hyperactor::actor::ActorStatus;
23use hyperactor::actor::Referable;
24use hyperactor::actor::handle_undeliverable_message;
25use hyperactor::context;
26use hyperactor::kv_pairs;
27use hyperactor::mailbox::MessageEnvelope;
28use hyperactor::mailbox::Undeliverable;
29use hyperactor::reference as hyperactor_reference;
30use hyperactor::supervision::ActorSupervisionEvent;
31use hyperactor_config::CONFIG;
32use hyperactor_config::ConfigAttr;
33use hyperactor_config::Flattrs;
34use hyperactor_config::attrs::declare_attrs;
35use hyperactor_telemetry::declare_static_counter;
36use ndslice::ViewExt;
37use ndslice::view::CollectMeshExt;
38use ndslice::view::Point;
39use ndslice::view::Ranked;
40use serde::Deserialize;
41use serde::Serialize;
42use tokio::time::Duration;
43use typeuri::Named;
44
45use crate::Name;
46use crate::ValueMesh;
47use crate::actor_mesh::ActorMeshRef;
48use crate::bootstrap::ProcStatus;
49use crate::casting::CAST_ACTOR_MESH_ID;
50use crate::casting::update_undeliverable_envelope_for_casting;
51use crate::host_mesh::HostMeshRef;
52use crate::proc_agent::ActorState;
53use crate::proc_agent::MESH_ORPHAN_TIMEOUT;
54use crate::proc_mesh::ProcMeshRef;
55use crate::resource;
56use crate::supervision::MeshFailure;
57use crate::supervision::Unhealthy;
58
59/// Actor name for `ActorMeshController` when spawned as a named child.
60pub const ACTOR_MESH_CONTROLLER_NAME: &str = "actor_mesh_controller";
61
62declare_attrs! {
63    /// Time between checks of actor states to create supervision events for
64    /// owners. The longer this is, the longer it will take to detect a failure
65    /// and report it to all subscribers; however, shorter intervals will send
66    /// more frequent messages and heartbeats just to see everything is still running.
67    /// The default is chosen to balance these two objectives.
68    /// This also controls how frequently the healthy heartbeat is sent out to
69    /// subscribers if there are no failures encountered.
70    @meta(CONFIG = ConfigAttr::new(
71        Some("HYPERACTOR_MESH_SUPERVISION_POLL_FREQUENCY".to_string()),
72        None,
73    ))
74    pub attr SUPERVISION_POLL_FREQUENCY: Duration = Duration::from_secs(10);
75}
76
77declare_static_counter!(
78    ACTOR_MESH_CONTROLLER_SUPERVISION_STALLS,
79    "actor.actor_mesh_controller.num_stalls"
80);
81
82#[derive(Debug)]
83struct HealthState {
84    /// The status of each actor in the controlled mesh, paired with the
85    /// generation counter from the most recent update. The generation is
86    /// used for last-writer-wins ordering between streamed and polled updates.
87    statuses: HashMap<Point, (resource::Status, u64)>,
88    unhealthy_event: Option<Unhealthy>,
89    crashed_ranks: HashMap<usize, ActorSupervisionEvent>,
90    // The unique owner of this actor.
91    owner: Option<hyperactor_reference::PortRef<MeshFailure>>,
92    /// A set of subscribers to send messages to when events are encountered.
93    subscribers: HashSet<hyperactor_reference::PortRef<Option<MeshFailure>>>,
94}
95
96impl HealthState {
97    fn new(
98        statuses: HashMap<Point, resource::Status>,
99        owner: Option<hyperactor_reference::PortRef<MeshFailure>>,
100    ) -> Self {
101        Self {
102            statuses: statuses
103                .into_iter()
104                .map(|(point, status)| (point, (status, 0)))
105                .collect(),
106            unhealthy_event: None,
107            crashed_ranks: HashMap::new(),
108            owner,
109            subscribers: HashSet::new(),
110        }
111    }
112
113    /// Try to update the status at `point`. Returns `true` if the status
114    /// was newly inserted or changed; `false` if dominated by a higher
115    /// generation or unchanged.
116    fn maybe_update(&mut self, point: Point, status: resource::Status, generation: u64) -> bool {
117        use std::collections::hash_map::Entry;
118        match self.statuses.entry(point) {
119            Entry::Occupied(mut entry) => {
120                let (old_status, old_gen) = entry.get();
121                // Once a resource enters a terminating state (including Stopping),
122                // its status is frozen — later updates are ignored.
123                if old_status.is_terminating() || *old_gen > generation {
124                    return false;
125                }
126                let changed = *old_status != status;
127                *entry.get_mut() = (status, generation);
128                changed
129            }
130            Entry::Vacant(entry) => {
131                entry.insert((status, generation));
132                true
133            }
134        }
135    }
136}
137
138/// Subscribe me to updates about a mesh. If a duplicate is subscribed, only a single
139/// message is sent.
140/// Will send None if there are no failures on the mesh periodically. This guarantees
141/// the listener that the controller is still alive. Make sure to filter such events
142/// out as not useful.
143#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
144pub struct Subscribe(pub hyperactor_reference::PortRef<Option<MeshFailure>>);
145
146/// Unsubscribe me to future updates about a mesh. Should be the same port used in
147/// the Subscribe message.
148#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
149pub struct Unsubscribe(pub hyperactor_reference::PortRef<Option<MeshFailure>>);
150
151/// Query the number of active supervision subscribers on this controller.
152#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
153pub struct GetSubscriberCount(#[binding(include)] pub hyperactor_reference::PortRef<usize>);
154
155/// Check state of the actors in the mesh. This is used as a self message to
156/// periodically check.
157/// Stores the next time we expect to start running a check state message.
158/// Used to check for stalls in message handling.
159#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
160pub struct CheckState(pub std::time::SystemTime);
161
162/// The implementation of monitoring works as follows:
163/// * ActorMesh and ActorMeshRef subscribe for updates from this controller,
164///   which aggregates events from all owned actors.
165/// * The monitor continuously polls for new events. When new events are
166///   found, it sends messages to all subscribers
167/// * In addition to sending to subscribers, the owner is an automatic subscriber
168///   that also has to handle the events.
169#[hyperactor::export(handlers = [
170    Subscribe,
171    Unsubscribe,
172    GetSubscriberCount,
173    resource::State<ActorState>,
174    resource::CreateOrUpdate<resource::mesh::Spec<()>> { cast = true },
175    resource::GetState<resource::mesh::State<()>> { cast = true },
176    resource::Stop { cast = true },
177])]
178pub struct ActorMeshController<A>
179where
180    A: Referable,
181{
182    mesh: ActorMeshRef<A>,
183    supervision_display_name: String,
184    // Shared health state for the monitor and responding to queries.
185    health_state: HealthState,
186    // The monitor which continuously runs in the background to refresh the state
187    // of actors.
188    // If None, the actor it monitors has already stopped.
189    monitor: Option<()>,
190}
191
192impl<A: Referable> resource::mesh::Mesh for ActorMeshController<A> {
193    type Spec = ();
194    type State = ();
195}
196
197impl<A: Referable> ActorMeshController<A> {
198    /// Create a new mesh controller based on the provided reference.
199    pub(crate) fn new(
200        mesh: ActorMeshRef<A>,
201        supervision_display_name: Option<String>,
202        port: Option<hyperactor_reference::PortRef<MeshFailure>>,
203        initial_statuses: ValueMesh<resource::Status>,
204    ) -> Self {
205        let supervision_display_name =
206            supervision_display_name.unwrap_or_else(|| mesh.name().to_string());
207        Self {
208            mesh,
209            supervision_display_name,
210            health_state: HealthState::new(initial_statuses.iter().collect(), port),
211            monitor: None,
212        }
213    }
214
215    async fn stop(
216        &self,
217        cx: &impl context::Actor,
218        reason: String,
219    ) -> crate::Result<ValueMesh<resource::Status>> {
220        // Cannot use "ActorMesh::stop" as it tries to message the controller, which is this actor.
221        self.mesh
222            .proc_mesh()
223            .stop_actor_by_name(cx, self.mesh.name().clone(), reason)
224            .await
225    }
226
227    fn self_check_state_message(&self, cx: &Instance<Self>) -> Result<(), ActorError> {
228        // Only schedule a self message if the monitor has not been dropped.
229        if self.monitor.is_some() {
230            // Save when we expect the next check state message, so we can automatically
231            // detect stalls as they accumulate.
232            let delay = hyperactor_config::global::get(SUPERVISION_POLL_FREQUENCY);
233            cx.self_message_with_delay(CheckState(std::time::SystemTime::now() + delay), delay)
234        } else {
235            Ok(())
236        }
237    }
238}
239
240declare_attrs! {
241    /// If present in a message header, the message is from an ActorMeshController
242    /// to a subscriber and can be safely dropped if it is returned as undeliverable.
243    pub attr ACTOR_MESH_SUBSCRIBER_MESSAGE: bool;
244}
245
246fn send_subscriber_message(
247    cx: &impl context::Actor,
248    subscriber: &hyperactor_reference::PortRef<Option<MeshFailure>>,
249    message: MeshFailure,
250) {
251    let mut headers = Flattrs::new();
252    headers.set(ACTOR_MESH_SUBSCRIBER_MESSAGE, true);
253    if let Err(error) = subscriber.send_with_headers(cx, headers, Some(message.clone())) {
254        tracing::warn!(
255            event = %message,
256            "failed to send supervision event to subscriber {}: {}",
257            subscriber.port_id(),
258            error
259        );
260    } else {
261        tracing::info!(event = %message, "sent supervision failure message to subscriber {}", subscriber.port_id());
262    }
263}
264
265impl<A: Referable> Debug for ActorMeshController<A> {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        f.debug_struct("MeshController")
268            .field("mesh", &self.mesh)
269            .field("health_state", &self.health_state)
270            .field("monitor", &self.monitor)
271            .finish()
272    }
273}
274
275#[async_trait]
276impl<A: Referable> Actor for ActorMeshController<A> {
277    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
278        this.set_system();
279        // Start the monitor task.
280        // There's a shared monitor for all whole mesh ref. Note that slices do
281        // not share the health state. This is fine because requerying a slice
282        // of a mesh will still return any failed state.
283        self.monitor = Some(());
284        self.self_check_state_message(this)?;
285
286        // Subscribe to streaming state updates from all ProcAgents so the
287        // controller receives state changes in real time, complementing the
288        // existing polling loop.
289        self.mesh.proc_mesh().agent_mesh().cast(
290            this,
291            resource::StreamState::<ActorState> {
292                name: self.mesh.name().clone(),
293                // All ProcAgents send updates directly to this port
294                // so that failures along the comm tree path does not
295                // affect clean shutdowns.
296
297                // Avoid binding the handle here: the controller's
298                // exported ports are bound when proc_mesh installs the
299                // ActorRef after spawn. Binding the same handle twice
300                // panics.
301                //
302                // TODO(SF, 2026-03-32, T261106175): follow up in
303                // hyperactor on bind semantics here. `cx.port()` plus
304                // later actor-ref export currently hits `bind()` ->
305                // `bind_actor_port()` on the same handle, and
306                // `bind_actor_port()` still panics on an
307                // already-bound handle. This workaround uses
308                // `attest_message_port(...)` to avoid the eager bind,
309                // but the longer-term fix is to clarify whether that
310                // bind path should be idempotent and eliminate the
311                // need for attestation here.
312                subscriber: hyperactor_reference::PortRef::<resource::State<ActorState>>::attest_message_port(this.self_id()).unsplit(),
313            },
314        )?;
315
316        let owner = if let Some(owner) = &self.health_state.owner {
317            owner.to_string()
318        } else {
319            String::from("None")
320        };
321        tracing::info!(actor_id = %this.self_id(), %owner, "started mesh controller for {}", self.mesh.name());
322        Ok(())
323    }
324
325    async fn cleanup(
326        &mut self,
327        this: &Instance<Self>,
328        _err: Option<&ActorError>,
329    ) -> Result<(), anyhow::Error> {
330        // If the monitor hasn't been dropped yet, send a stop message to the
331        // proc mesh.
332        if self.monitor.take().is_some() {
333            tracing::info!(actor_id = %this.self_id(), actor_mesh = %self.mesh.name(), "starting cleanup for ActorMeshController, stopping actor mesh");
334            self.stop(this, "actor mesh controller cleanup".to_string())
335                .await?;
336        }
337        Ok(())
338    }
339
340    async fn handle_undeliverable_message(
341        &mut self,
342        cx: &Instance<Self>,
343        mut envelope: Undeliverable<MessageEnvelope>,
344    ) -> Result<(), anyhow::Error> {
345        // Update the destination in case this was a casting message.
346        envelope = update_undeliverable_envelope_for_casting(envelope);
347        if let Some(true) = envelope.0.headers().get(ACTOR_MESH_SUBSCRIBER_MESSAGE) {
348            // Remove from the subscriber list (if it existed) so we don't
349            // send to this subscriber again.
350            // NOTE: The only part of the port that is used for equality checks is
351            // the port id, so create a new one just for the comparison.
352            let dest_port_id = envelope.0.dest().clone();
353            let port = hyperactor_reference::PortRef::<Option<MeshFailure>>::attest(dest_port_id);
354            let did_exist = self.health_state.subscribers.remove(&port);
355            if did_exist {
356                tracing::debug!(
357                    actor_id = %cx.self_id(),
358                    num_subscribers = self.health_state.subscribers.len(),
359                    "ActorMeshController: handle_undeliverable_message: removed subscriber {} from mesh controller",
360                    port.port_id()
361                );
362            }
363            Ok(())
364        } else if envelope.0.headers().get(CAST_ACTOR_MESH_ID).is_some() {
365            // A cast message we sent (e.g. StreamState or KeepaliveGetState)
366            // was returned by the CommActor because it could not be forwarded.
367            // This is expected when the network session is broken. Log and
368            // continue — the supervision polling loop will detect the failure.
369            tracing::warn!(
370                actor_id = %cx.self_id(),
371                dest = %envelope.0.dest(),
372                "ActorMeshController: ignoring undeliverable cast message",
373            );
374            Ok(())
375        } else {
376            handle_undeliverable_message(cx, envelope)
377        }
378    }
379}
380
381#[async_trait]
382impl<A: Referable> Handler<Subscribe> for ActorMeshController<A> {
383    async fn handle(&mut self, cx: &Context<Self>, message: Subscribe) -> anyhow::Result<()> {
384        // If we can't send a message to a subscriber, the subscriber might be gone.
385        // That shouldn't cause this actor to exit.
386        // This is handled by the handle_undeliverable_message method.
387        // If there are any crashed ranks, replay a failure event so the new
388        // subscriber learns about the current health state. We send a single
389        // message with all crashed ranks so the subscriber's filter can check
390        // overlap with its slice region. This avoids the watch-channel
391        // coalescing problem (sending per-rank messages would lose all but
392        // the last one).
393        if let Some(unhealthy) = &self.health_state.unhealthy_event {
394            let msg = match unhealthy {
395                Unhealthy::StreamClosed(msg) | Unhealthy::Crashed(msg) => msg,
396            };
397            let mut replay_msg = msg.clone();
398            replay_msg.crashed_ranks = self.health_state.crashed_ranks.keys().copied().collect();
399            send_subscriber_message(cx, &message.0, replay_msg);
400        }
401        let port_id = message.0.port_id().clone();
402        if self.health_state.subscribers.insert(message.0) {
403            tracing::debug!(actor_id = %cx.self_id(), num_subscribers = self.health_state.subscribers.len(), "added subscriber {} to mesh controller", port_id);
404        }
405        Ok(())
406    }
407}
408
409#[async_trait]
410impl<A: Referable> Handler<Unsubscribe> for ActorMeshController<A> {
411    async fn handle(&mut self, cx: &Context<Self>, message: Unsubscribe) -> anyhow::Result<()> {
412        if self.health_state.subscribers.remove(&message.0) {
413            tracing::debug!(actor_id = %cx.self_id(), num_subscribers = self.health_state.subscribers.len(), "removed subscriber {} from mesh controller", message.0.port_id());
414        }
415        Ok(())
416    }
417}
418
419#[async_trait]
420impl<A: Referable> Handler<GetSubscriberCount> for ActorMeshController<A> {
421    async fn handle(
422        &mut self,
423        cx: &Context<Self>,
424        message: GetSubscriberCount,
425    ) -> anyhow::Result<()> {
426        message.0.send(cx, self.health_state.subscribers.len())?;
427        Ok(())
428    }
429}
430
431#[async_trait]
432impl<A: Referable> Handler<resource::CreateOrUpdate<resource::mesh::Spec<()>>>
433    for ActorMeshController<A>
434{
435    /// Currently a no-op as there's nothing to create or update, but allows
436    /// ActorMeshController to implement the resource mesh behavior.
437    async fn handle(
438        &mut self,
439        _cx: &Context<Self>,
440        _message: resource::CreateOrUpdate<resource::mesh::Spec<()>>,
441    ) -> anyhow::Result<()> {
442        Ok(())
443    }
444}
445
446#[async_trait]
447impl<A: Referable> Handler<resource::GetState<resource::mesh::State<()>>>
448    for ActorMeshController<A>
449{
450    async fn handle(
451        &mut self,
452        cx: &Context<Self>,
453        message: resource::GetState<resource::mesh::State<()>>,
454    ) -> anyhow::Result<()> {
455        let status = if let Some(Unhealthy::Crashed(e)) = &self.health_state.unhealthy_event {
456            resource::Status::Failed(e.to_string())
457        } else if let Some(Unhealthy::StreamClosed(_)) = &self.health_state.unhealthy_event {
458            resource::Status::Stopped
459        } else {
460            resource::Status::Running
461        };
462        let mut statuses = self
463            .health_state
464            .statuses
465            .iter()
466            .map(|(p, (s, _))| (p.clone(), s.clone()))
467            .collect::<Vec<_>>();
468        statuses.sort_by_key(|(p, _)| p.rank());
469        let statuses: ValueMesh<resource::Status> =
470            statuses
471                .into_iter()
472                .map(|(_, s)| s)
473                .collect_mesh::<ValueMesh<_>>(self.mesh.region().clone())?;
474        let state = resource::mesh::State {
475            statuses,
476            state: (),
477        };
478        message.reply.send(
479            cx,
480            resource::State {
481                name: message.name,
482                status,
483                state: Some(state),
484                generation: 0,
485                timestamp: std::time::SystemTime::now(),
486            },
487        )?;
488        Ok(())
489    }
490}
491
492#[async_trait]
493impl<A: Referable> Handler<resource::Stop> for ActorMeshController<A> {
494    async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
495        let mesh = &self.mesh;
496        let mesh_name = mesh.name();
497        tracing::info!(
498            name = "ActorMeshControllerStatus",
499            %mesh_name,
500            reason = %message.reason,
501            "stopping actor mesh"
502        );
503        // Run the drop on the monitor loop. The actors will not change state
504        // after this point, because they will be stopped.
505        // This message is idempotent because multiple stops only send out one
506        // set of messages to subscribers.
507        if self.monitor.take().is_none() {
508            tracing::debug!(actor_id = %cx.self_id(), actor_mesh = %mesh_name, "duplicate stop request, actor mesh is already stopped");
509            return Ok(());
510        }
511        tracing::info!(actor_id = %cx.self_id(), actor_mesh = %mesh_name, "forwarding stop request from ActorMeshController to proc mesh");
512
513        // Let the client know that the controller has stopped. Since the monitor
514        // is cancelled, it will not alert the owner or the subscribers.
515        // We use a placeholder rank to get an actor id, but really there should
516        // be a stop event for every rank in the mesh. Since every rank has the
517        // same owner, we assume the rank doesn't matter, and the owner can just
518        // assume the stop happened on all actors.
519        let rank = 0usize;
520        let event = ActorSupervisionEvent::new(
521            // Use an actor id from the mesh.
522            mesh.get(rank).unwrap().actor_id().clone(),
523            None,
524            ActorStatus::Stopped("ActorMeshController received explicit stop request".to_string()),
525            None,
526        );
527        let failure_message = MeshFailure {
528            actor_mesh_name: Some(mesh_name.to_string()),
529            event,
530            crashed_ranks: vec![],
531        };
532        self.health_state.unhealthy_event = Some(Unhealthy::StreamClosed(failure_message.clone()));
533        // We don't send a message to the owner on stops, because only the owner
534        // can request a stop. We just send to subscribers instead, as they did
535        // not request the stop themselves.
536        for subscriber in self.health_state.subscribers.iter() {
537            send_subscriber_message(cx, subscriber, failure_message.clone());
538        }
539
540        // max_rank and extent are only needed for the deprecated RankedValues.
541        // TODO: add cmp::Ord to Point for a max() impl.
542        let max_rank = self.health_state.statuses.keys().map(|p| p.rank()).max();
543        let extent = self
544            .health_state
545            .statuses
546            .keys()
547            .next()
548            .map(|p| p.extent().clone());
549        // Send a stop message to the ProcAgent for these actors.
550        match self.stop(cx, message.reason.clone()).await {
551            Ok(statuses) => {
552                // All stops successful, set actor status on health state.
553                for (rank, status) in statuses.iter() {
554                    self.health_state
555                        .statuses
556                        .entry(rank)
557                        .and_modify(move |s| *s = (status, u64::MAX));
558                }
559            }
560            // An ActorStopError means some actors didn't reach the stopped state.
561            Err(crate::Error::ActorStopError { statuses }) => {
562                // If there are no states yet, nothing to update.
563                if let Some(max_rank) = max_rank {
564                    let extent = extent.expect("no actors in mesh");
565                    for (rank, status) in statuses.materialized_iter(max_rank).enumerate() {
566                        *self
567                            .health_state
568                            .statuses
569                            .get_mut(&extent.point_of_rank(rank).expect("illegal rank"))
570                            .unwrap() = (status.clone(), u64::MAX);
571                    }
572                }
573            }
574            // Other error types should be reported as supervision errors.
575            Err(e) => {
576                return Err(e.into());
577            }
578        }
579
580        tracing::info!(actor_id = %cx.self_id(), actor_mesh = %mesh_name, "stopped mesh");
581        Ok(())
582    }
583}
584
585/// Like send_state_change, but when there was no state change that occurred.
586/// Will send a None message to subscribers, and there is no state to change.
587/// Is not sent to the owner, because the owner is only watching for failures.
588/// Should be called once every so often so subscribers can discern the difference
589/// between "no messages because no errors" and "no messages because controller died".
590/// Without sending these hearbeats, subscribers will assume the mesh is dead.
591fn send_heartbeat(cx: &impl context::Actor, health_state: &HealthState) {
592    tracing::debug!(
593        num_subscribers = health_state.subscribers.len(),
594        "sending heartbeat to subscribers",
595    );
596
597    for subscriber in health_state.subscribers.iter() {
598        let mut headers = Flattrs::new();
599        headers.set(ACTOR_MESH_SUBSCRIBER_MESSAGE, true);
600        if let Err(e) = subscriber.send_with_headers(cx, headers, None) {
601            tracing::warn!(subscriber = %subscriber.port_id(), "error sending heartbeat message: {:?}", e);
602        }
603    }
604}
605
606/// Sends a MeshFailure to the owner and subscribers of this mesh,
607/// and changes the health state stored unhealthy_event.
608/// Owners are sent a message only for Failure events, not for Stopped events.
609/// Subscribers are sent both Stopped and Failure events.
610fn send_state_change(
611    cx: &impl context::Actor,
612    rank: usize,
613    event: ActorSupervisionEvent,
614    mesh_name: &Name,
615    is_proc_stopped: bool,
616    health_state: &mut HealthState,
617) {
618    // This does not include the Stopped status, which is a state that occurs when the
619    // user calls stop() on a proc or actor mesh.
620    let is_failed = event.is_error();
621    if is_failed {
622        tracing::warn!(
623            name = "SupervisionEvent",
624            actor_mesh = %mesh_name,
625            %event,
626            "detected supervision error on monitored mesh: name={mesh_name}",
627        );
628    } else {
629        tracing::debug!(
630            name = "SupervisionEvent",
631            actor_mesh = %mesh_name,
632            %event,
633            "detected non-error supervision event on monitored mesh: name={mesh_name}",
634        );
635    }
636
637    let failure_message = MeshFailure {
638        actor_mesh_name: Some(mesh_name.to_string()),
639        event: event.clone(),
640        crashed_ranks: vec![rank],
641    };
642    health_state.crashed_ranks.insert(rank, event.clone());
643    health_state.unhealthy_event = Some(if is_proc_stopped {
644        Unhealthy::StreamClosed(failure_message.clone())
645    } else {
646        Unhealthy::Crashed(failure_message.clone())
647    });
648    // Send a notification to the owning actor of this mesh, if there is one.
649    // Don't send a message to the owner for non-failure events such as "stopped".
650    // Those events are always initiated by the owner, who don't need to be
651    // told that they were stopped.
652    if is_failed {
653        if let Some(owner) = &health_state.owner {
654            if let Err(error) = owner.send(cx, failure_message.clone()) {
655                tracing::warn!(
656                    name = "SupervisionEvent",
657                    actor_mesh = %mesh_name,
658                    %event,
659                    %error,
660                    "failed to send supervision event to owner {}: {}. dropping event",
661                    owner.port_id(),
662                    error
663                );
664            } else {
665                tracing::info!(actor_mesh = %mesh_name, %event, "sent supervision failure message to owner {}", owner.port_id());
666            }
667        }
668    }
669    // Subscribers get all messages, even for non-failures like Stopped, because
670    // they need to know if the owner stopped the mesh.
671    for subscriber in health_state.subscribers.iter() {
672        send_subscriber_message(cx, subscriber, failure_message.clone());
673    }
674}
675
676fn actor_state_to_supervision_events(
677    state: resource::State<ActorState>,
678) -> (usize, Vec<ActorSupervisionEvent>) {
679    let (rank, actor_id, events) = match state.state {
680        Some(inner) => (
681            inner.create_rank,
682            Some(inner.actor_id),
683            inner.supervision_events.clone(),
684        ),
685        None => (0, None, vec![]),
686    };
687    let events = match state.status {
688        // If the actor was killed, it might not have a Failed status
689        // or supervision events, and it can't tell us which rank
690        resource::Status::NotExist | resource::Status::Stopped | resource::Status::Timeout(_) => {
691            // it was.
692            if !events.is_empty() {
693                events
694            } else {
695                vec![ActorSupervisionEvent::new(
696                    actor_id.expect("actor_id is None"),
697                    None,
698                    ActorStatus::Stopped(
699                        format!(
700                            "actor status is {}; actor may have been killed",
701                            state.status
702                        )
703                        .to_string(),
704                    ),
705                    None,
706                )]
707            }
708        }
709        resource::Status::Failed(_) => events,
710        // All other states are successful.
711        _ => vec![],
712    };
713    (rank, events)
714}
715
716/// Map a process-level [`ProcStatus`] to an actor-level [`ActorStatus`].
717///
718/// When the supervision poll discovers that a process is terminating, this
719/// function decides whether to treat it as a clean stop or a failure.
720/// Notably, [`ProcStatus::Stopping`] (SIGTERM sent, process not yet exited)
721/// is mapped to [`ActorStatus::Stopped`] rather than [`ActorStatus::Failed`]
722/// so that a graceful shutdown in progress does not trigger unhandled
723/// supervision errors.
724fn proc_status_to_actor_status(proc_status: Option<ProcStatus>) -> ActorStatus {
725    match proc_status {
726        Some(ProcStatus::Stopped { exit_code: 0, .. }) => {
727            ActorStatus::Stopped("process exited cleanly".to_string())
728        }
729        Some(ProcStatus::Stopped { exit_code, .. }) => {
730            ActorStatus::Failed(ActorErrorKind::Generic(format!(
731                "the process this actor was running on exited with non-zero code {}",
732                exit_code
733            )))
734        }
735        // Stopping is a transient state during graceful shutdown. Treat it the
736        // same as a clean stop rather than a failure.
737        Some(ProcStatus::Stopping { .. }) => {
738            ActorStatus::Stopped("process is stopping".to_string())
739        }
740        // Conservatively treat lack of status as stopped
741        None => ActorStatus::Stopped("no status received from process".to_string()),
742        Some(status) => ActorStatus::Failed(ActorErrorKind::Generic(format!(
743            "the process this actor was running on failed: {}",
744            status
745        ))),
746    }
747}
748
749#[async_trait]
750impl<A: Referable> Handler<resource::State<ActorState>> for ActorMeshController<A> {
751    async fn handle(
752        &mut self,
753        cx: &Context<Self>,
754        state: resource::State<ActorState>,
755    ) -> anyhow::Result<()> {
756        let (rank, events) = actor_state_to_supervision_events(state.clone());
757        let point = self.mesh.region().extent().point_of_rank(rank)?;
758
759        let changed = self
760            .health_state
761            .maybe_update(point, state.status, state.generation);
762
763        if changed && !events.is_empty() {
764            send_state_change(
765                cx,
766                rank,
767                events[0].clone(),
768                self.mesh.name(),
769                false,
770                &mut self.health_state,
771            );
772        }
773
774        // Once every rank has begun terminating (Stopping or beyond),
775        // the monitor is no longer needed.
776        if self
777            .health_state
778            .statuses
779            .values()
780            .all(|(s, _)| s.is_terminating())
781        {
782            self.monitor.take();
783        }
784        Ok(())
785    }
786}
787
788fn format_system_time(time: std::time::SystemTime) -> String {
789    let datetime: chrono::DateTime<chrono::Local> = time.into();
790    datetime.format("%Y-%m-%d %H:%M:%S").to_string()
791}
792
793#[async_trait]
794impl<A: Referable> Handler<CheckState> for ActorMeshController<A> {
795    /// Checks actor states and reschedules as a self-message.
796    ///
797    /// When any actor in this mesh changes state,
798    /// including once for the initial state of all actors, send a message to the
799    /// owners and subscribers of this mesh.
800    /// The receivers will get a MeshFailure. The created rank is
801    /// the original rank of the actor on the mesh, not the rank after
802    /// slicing.
803    ///
804    /// * SUPERVISION_POLL_FREQUENCY controls how frequently to poll.
805    /// * self-messaging stops when self.monitor is set to None.
806    async fn handle(
807        &mut self,
808        cx: &Context<Self>,
809        CheckState(expected_time): CheckState,
810    ) -> Result<(), anyhow::Error> {
811        // A delayed CheckState may arrive after Stop has already dropped
812        // the monitor. Discard it — there is nothing left to poll.
813        if self.monitor.is_none() {
814            return Ok(());
815        }
816
817        // This implementation polls every "time_between_checks" duration, checking
818        // for changes in the actor states. It can be improved in two ways:
819        // 1. Use accumulation, to get *any* actor with a change in state, not *all*
820        //    actors.
821        // 2. Use a push-based mode instead of polling.
822        // Wait in between checking to avoid using too much network.
823
824        // Check for stalls in the supervision loop. These delays can cause the
825        // subscribers to think the controller is dead.
826        // Allow a little slack time to avoid logging for innocuous delays.
827        // If it's greater than 2x the expected time, log a warning.
828        if std::time::SystemTime::now()
829            > expected_time + hyperactor_config::global::get(SUPERVISION_POLL_FREQUENCY)
830        {
831            // Current time is included by default in the log message.
832            let expected_time = format_system_time(expected_time);
833            // Track in both metrics and tracing.
834            ACTOR_MESH_CONTROLLER_SUPERVISION_STALLS.add(1, kv_pairs!("actor_id" => cx.self_id().to_string(), "expected_time" => expected_time.clone()));
835            tracing::warn!(
836                actor_id = %cx.self_id(),
837                "Handler<CheckState> is being stalled, expected at {}",
838                expected_time,
839            );
840        }
841        let mesh = &self.mesh;
842        let supervision_display_name = &self.supervision_display_name;
843        // First check if the proc mesh is dead before trying to query their agents.
844        let proc_states = mesh.proc_mesh().proc_states(cx).await;
845        if let Err(e) = proc_states {
846            send_state_change(
847                cx,
848                0,
849                ActorSupervisionEvent::new(
850                    cx.self_id().clone(),
851                    None,
852                    ActorStatus::generic_failure(format!(
853                        "unable to query for proc states: {:?}",
854                        e
855                    )),
856                    None,
857                ),
858                mesh.name(),
859                false,
860                &mut self.health_state,
861            );
862            self.self_check_state_message(cx)?;
863            return Ok(());
864        }
865        if let Some(proc_states) = proc_states.unwrap() {
866            // Check if the proc mesh is still alive.
867            if let Some((point, state)) = proc_states
868                .iter()
869                .find(|(_rank, state)| state.status.is_terminating())
870            {
871                // TODO: allow "actor supervision event" to be general, and
872                // make the proc failure the cause. It is a hack to try to determine
873                // the correct status based on process exit status.
874                let actor_status =
875                    proc_status_to_actor_status(state.state.and_then(|s| s.proc_status));
876                let display_name = crate::actor_display_name(supervision_display_name, &point);
877                send_state_change(
878                    cx,
879                    point.rank(),
880                    ActorSupervisionEvent::new(
881                        // Attribute this to the monitored actor, even if the underlying
882                        // cause is a proc_failure. We propagate the cause explicitly.
883                        mesh.get(point.rank()).unwrap().actor_id().clone(),
884                        Some(display_name),
885                        actor_status,
886                        None,
887                    ),
888                    mesh.name(),
889                    true,
890                    &mut self.health_state,
891                );
892                self.self_check_state_message(cx)?;
893                return Ok(());
894            }
895        }
896
897        // Now that we know the proc mesh is alive, check for actor state changes.
898        let orphan_timeout = hyperactor_config::global::get(MESH_ORPHAN_TIMEOUT);
899        let keepalive = if orphan_timeout.is_zero() {
900            None
901        } else {
902            Some(std::time::SystemTime::now() + orphan_timeout)
903        };
904        let events = mesh.actor_states_with_keepalive(cx, keepalive).await;
905        if let Err(e) = events {
906            send_state_change(
907                cx,
908                0,
909                ActorSupervisionEvent::new(
910                    cx.self_id().clone(),
911                    Some(supervision_display_name.clone()),
912                    ActorStatus::generic_failure(format!(
913                        "unable to query for actor states: {:?}",
914                        e
915                    )),
916                    None,
917                ),
918                mesh.name(),
919                false,
920                &mut self.health_state,
921            );
922            self.self_check_state_message(cx)?;
923            return Ok(());
924        }
925        // If there was any state change, we don't need to send a heartbeat.
926        let mut did_send_state_change = false;
927        // True if any rank is terminating (Stopping or beyond). Once set,
928        // no more heartbeats are sent.
929        let mut any_terminating = false;
930        // This returned point is the created rank, *not* the rank of
931        // the possibly sliced input mesh.
932        for (point, state) in events.unwrap().iter() {
933            let changed = self.health_state.maybe_update(
934                point.clone(),
935                state.status.clone(),
936                state.generation,
937            );
938            if !any_terminating {
939                if let Some((s, _)) = self.health_state.statuses.get(&point) {
940                    if s.is_terminating() {
941                        any_terminating = true;
942                    }
943                }
944            }
945            if !changed {
946                continue;
947            }
948            let (rank, events) = actor_state_to_supervision_events(state.clone());
949            if events.is_empty() {
950                continue;
951            }
952            did_send_state_change = true;
953            send_state_change(
954                cx,
955                rank,
956                events[0].clone(),
957                mesh.name(),
958                false,
959                &mut self.health_state,
960            );
961        }
962        if !did_send_state_change && !any_terminating {
963            // No state change, but subscribers need to be sent a message
964            // every so often so they know the controller is still alive.
965            // Send a "no state change" message.
966            // Only if the last state for any actor in this mesh is not a terminal state.
967            send_heartbeat(cx, &self.health_state);
968        }
969
970        // Once every rank has begun terminating, no further state changes
971        // are possible — stop polling and drop the monitor.
972        let all_terminating = self
973            .health_state
974            .statuses
975            .values()
976            .all(|(s, _)| s.is_terminating());
977        if !all_terminating {
978            // Schedule a self send after a waiting period.
979            self.self_check_state_message(cx)?;
980        } else {
981            // There's no need to send a stop message during cleanup if all
982            // ranks are already terminating.
983            self.monitor.take();
984        }
985        return Ok(());
986    }
987}
988
989#[derive(Debug)]
990#[hyperactor::export]
991pub(crate) struct ProcMeshController {
992    mesh: ProcMeshRef,
993}
994
995impl ProcMeshController {
996    /// Create a new proc controller based on the provided reference.
997    pub(crate) fn new(mesh: ProcMeshRef) -> Self {
998        Self { mesh }
999    }
1000}
1001
1002#[async_trait]
1003impl Actor for ProcMeshController {
1004    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1005        this.set_system();
1006        Ok(())
1007    }
1008
1009    async fn cleanup(
1010        &mut self,
1011        this: &Instance<Self>,
1012        _err: Option<&ActorError>,
1013    ) -> Result<(), anyhow::Error> {
1014        // Cannot use "ProcMesh::stop" as it's only defined on ProcMesh, not ProcMeshRef.
1015        let names = self
1016            .mesh
1017            .proc_ids()
1018            .collect::<Vec<hyperactor_reference::ProcId>>();
1019        let region = self.mesh.region().clone();
1020        if let Some(hosts) = self.mesh.hosts() {
1021            hosts
1022                .stop_proc_mesh(
1023                    this,
1024                    self.mesh.name(),
1025                    names,
1026                    region,
1027                    "proc mesh controller cleanup".to_string(),
1028                )
1029                .await
1030        } else {
1031            Ok(())
1032        }
1033    }
1034}
1035
1036#[derive(Debug)]
1037#[hyperactor::export]
1038pub(crate) struct HostMeshController {
1039    mesh: HostMeshRef,
1040}
1041
1042impl HostMeshController {
1043    /// Create a new host controller based on the provided reference.
1044    pub(crate) fn new(mesh: HostMeshRef) -> Self {
1045        Self { mesh }
1046    }
1047}
1048
1049#[async_trait]
1050impl Actor for HostMeshController {
1051    async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
1052        this.set_system();
1053        Ok(())
1054    }
1055
1056    async fn cleanup(
1057        &mut self,
1058        this: &Instance<Self>,
1059        _err: Option<&ActorError>,
1060    ) -> Result<(), anyhow::Error> {
1061        // Cannot use "HostMesh::shutdown" as it's only defined on HostMesh, not HostMeshRef.
1062        for host in self.mesh.values() {
1063            if let Err(e) = host.shutdown(this).await {
1064                tracing::warn!(host = %host, error = %e, "host shutdown failed");
1065            }
1066        }
1067        Ok(())
1068    }
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use std::ops::Deref;
1074    use std::time::Duration;
1075
1076    use hyperactor::actor::ActorStatus;
1077    use ndslice::Extent;
1078    use ndslice::ViewExt;
1079
1080    use super::SUPERVISION_POLL_FREQUENCY;
1081    use super::proc_status_to_actor_status;
1082    use crate::ActorMesh;
1083    use crate::Name;
1084    use crate::bootstrap::ProcStatus;
1085    use crate::proc_agent::MESH_ORPHAN_TIMEOUT;
1086    use crate::resource;
1087    use crate::supervision::MeshFailure;
1088    use crate::test_utils::local_host_mesh;
1089    use crate::testactor;
1090    use crate::testing;
1091
1092    /// Verify that actors spawned without a controller are cleaned up
1093    /// when their keepalive expiry lapses. We:
1094    ///   1. Enable the orphan timeout on the `ProcMeshAgent`.
1095    ///   2. Spawn actors as *system actors* (no `ActorMeshController`).
1096    ///   3. Send a single keepalive with a short expiry time.
1097    ///   4. Wait for the expiry to pass and `SelfCheck` to fire.
1098    ///   5. Assert that the actors are now stopped.
1099    #[tokio::test]
1100    async fn test_orphaned_actors_are_cleaned_up() {
1101        let config = hyperactor_config::global::lock();
1102        // Short orphan timeout so SelfCheck fires frequently.
1103        let _orphan = config.override_key(MESH_ORPHAN_TIMEOUT, Duration::from_secs(1));
1104
1105        let instance = testing::instance();
1106        let host_mesh = local_host_mesh(2).await;
1107        let proc_mesh = host_mesh
1108            .spawn(instance, "test", Extent::unity(), None)
1109            .await
1110            .unwrap();
1111
1112        let actor_name = Name::new("orphan_test").unwrap();
1113        // Spawn as a system actor so no controller is created. This lets us
1114        // control keepalive messages directly without the controller
1115        // interfering.
1116        let actor_mesh: ActorMesh<testactor::TestActor> = proc_mesh
1117            .spawn_with_name(instance, actor_name.clone(), &(), None, true)
1118            .await
1119            .unwrap();
1120        assert!(
1121            actor_mesh.deref().extent().num_ranks() > 0,
1122            "should have spawned at least one actor"
1123        );
1124
1125        // Send a keepalive with a short expiry. This is what the
1126        // ActorMeshController would normally do on each supervision poll.
1127        let states = proc_mesh
1128            .actor_states_with_keepalive(
1129                instance,
1130                actor_name.clone(),
1131                Some(std::time::SystemTime::now() + Duration::from_secs(2)),
1132            )
1133            .await
1134            .unwrap();
1135        // All actors should be running right now.
1136        for state in states.values() {
1137            assert_eq!(
1138                state.status,
1139                resource::Status::Running,
1140                "actor should be running before expiry"
1141            );
1142        }
1143
1144        // Wait long enough for the expiry to pass and at least one
1145        // SelfCheck cycle to fire. With MESH_ORPHAN_TIMEOUT = 1s and
1146        // expiry in 2s, by around 4s at least two SelfCheck cycles will
1147        // have elapsed after the expiry.
1148        tokio::time::sleep(Duration::from_secs(5)).await;
1149
1150        // Query again, this time *without* a keepalive so we don't
1151        // extend the expiry.
1152        let states = proc_mesh
1153            .actor_states(instance, actor_name.clone())
1154            .await
1155            .unwrap();
1156        for state in states.values() {
1157            assert_eq!(
1158                state.status,
1159                resource::Status::Stopped,
1160                "actor should be stopped after keepalive expiry"
1161            );
1162        }
1163    }
1164
1165    /// Create a multi-process host mesh that propagates the current
1166    /// process's config overrides to child processes via Bootstrap.
1167    #[cfg(fbcode_build)]
1168    async fn host_mesh_with_config(n: usize) -> crate::host_mesh::HostMeshShutdownGuard {
1169        use hyperactor::channel::ChannelTransport;
1170        use tokio::process::Command;
1171
1172        let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");
1173        let mut host_addrs = vec![];
1174        for _ in 0..n {
1175            host_addrs.push(ChannelTransport::Unix.any());
1176        }
1177
1178        for host in host_addrs.iter() {
1179            let mut cmd = Command::new(program.clone());
1180            let boot = crate::Bootstrap::Host {
1181                addr: host.clone(),
1182                command: None,
1183                config: Some(hyperactor_config::global::attrs()),
1184                exit_on_shutdown: false,
1185            };
1186            boot.to_env(&mut cmd);
1187            cmd.kill_on_drop(false);
1188            // SAFETY: pre_exec sets PR_SET_PDEATHSIG so the child is
1189            // cleaned up if the parent (test) process dies.
1190            unsafe {
1191                cmd.pre_exec(crate::bootstrap::install_pdeathsig_kill);
1192            }
1193            cmd.spawn().unwrap();
1194        }
1195
1196        let host_mesh = crate::HostMeshRef::from_hosts(Name::new("test").unwrap(), host_addrs);
1197        crate::host_mesh::HostMesh::take(host_mesh).shutdown_guard()
1198    }
1199
1200    /// Verify that actors are cleaned up via the orphan timeout when the
1201    /// `ActorMeshController`'s process crashes. Unlike the system-actor test
1202    /// above, this spawns actors through a real controller (via `WrapperActor`)
1203    /// and then kills the controller's process uncleanly with `ProcessExit`.
1204    /// The agents on the surviving proc mesh detect the expired keepalive
1205    /// and stop the actors.
1206    #[tokio::test]
1207    #[cfg(fbcode_build)]
1208    async fn test_orphaned_actors_cleaned_up_on_controller_crash() {
1209        let config = hyperactor_config::global::lock();
1210        let _orphan = config.override_key(MESH_ORPHAN_TIMEOUT, Duration::from_secs(2));
1211        let _poll = config.override_key(SUPERVISION_POLL_FREQUENCY, Duration::from_secs(1));
1212
1213        let instance = testing::instance();
1214        let num_replicas = 2;
1215
1216        // Host mesh for the test actors (these survive the crash).
1217        // host_mesh_with_config propagates config overrides to child
1218        // processes via Bootstrap, so agents boot with
1219        // MESH_ORPHAN_TIMEOUT=2s and start the SelfCheck loop.
1220        let mut actor_hm = host_mesh_with_config(num_replicas).await;
1221        let actor_proc_mesh = actor_hm
1222            .spawn(instance, "actors", Extent::unity(), None)
1223            .await
1224            .unwrap();
1225
1226        // Host mesh for the wrapper + controller (will be killed).
1227        let mut controller_hm = host_mesh_with_config(1).await;
1228        let controller_proc_mesh = controller_hm
1229            .spawn(instance, "controller", Extent::unity(), None)
1230            .await
1231            .unwrap();
1232
1233        let child_name = Name::new("orphan_child").unwrap();
1234
1235        // Supervision port required by WrapperActor params.
1236        let (supervision_port, _supervision_receiver) = instance.open_port::<MeshFailure>();
1237        let supervisor = supervision_port.bind();
1238
1239        // Spawn WrapperActor on controller_proc_mesh. Its init() spawns
1240        // ActorMesh<TestActor> on actor_proc_mesh with a real
1241        // ActorMeshController co-located on the controller's process.
1242        let wrapper_mesh: ActorMesh<testactor::WrapperActor> = controller_proc_mesh
1243            .spawn(
1244                instance,
1245                "wrapper",
1246                &(
1247                    actor_proc_mesh.deref().clone(),
1248                    supervisor,
1249                    child_name.clone(),
1250                ),
1251            )
1252            .await
1253            .unwrap();
1254
1255        // Give the controller time to run at least one CheckState cycle
1256        // (polling every 1s) so it sends KeepaliveGetState to the agents.
1257        tokio::time::sleep(Duration::from_secs(3)).await;
1258
1259        // Verify actors are running before the crash.
1260        let states = actor_proc_mesh
1261            .actor_states(instance, child_name.clone())
1262            .await
1263            .unwrap();
1264        for state in states.values() {
1265            assert_eq!(
1266                state.status,
1267                resource::Status::Running,
1268                "actor should be running before controller crash"
1269            );
1270        }
1271
1272        // Kill the controller's process uncleanly. send_to_children: false
1273        // means only the WrapperActor's process exits; the TestActors on
1274        // actor_proc_mesh survive.
1275        wrapper_mesh
1276            .cast(
1277                instance,
1278                testactor::CauseSupervisionEvent {
1279                    kind: testactor::SupervisionEventType::ProcessExit(1),
1280                    send_to_children: false,
1281                },
1282            )
1283            .unwrap();
1284
1285        // Wait for:
1286        //  - keepalive expiry (2s from last CheckState)
1287        //  - at least one SelfCheck cycle (every 2s)
1288        //  - margin for processing
1289        tokio::time::sleep(Duration::from_secs(8)).await;
1290
1291        // Actors should now be stopped via the orphan timeout.
1292        let states = actor_proc_mesh
1293            .actor_states(instance, child_name.clone())
1294            .await
1295            .unwrap();
1296        for state in states.values() {
1297            assert_eq!(
1298                state.status,
1299                resource::Status::Stopped,
1300                "actor should be stopped after controller crash and orphan timeout"
1301            );
1302        }
1303
1304        let _ = actor_hm.shutdown(instance).await;
1305        let _ = controller_hm.shutdown(instance).await;
1306    }
1307
1308    #[test]
1309    fn test_proc_status_to_actor_status_stopped_cleanly() {
1310        let status = proc_status_to_actor_status(Some(ProcStatus::Stopped {
1311            exit_code: 0,
1312            stderr_tail: vec![],
1313        }));
1314        assert!(
1315            matches!(status, ActorStatus::Stopped(ref msg) if msg.contains("cleanly")),
1316            "expected Stopped, got {:?}",
1317            status
1318        );
1319    }
1320
1321    #[test]
1322    fn test_proc_status_to_actor_status_nonzero_exit() {
1323        let status = proc_status_to_actor_status(Some(ProcStatus::Stopped {
1324            exit_code: 1,
1325            stderr_tail: vec![],
1326        }));
1327        assert!(
1328            matches!(status, ActorStatus::Failed(_)),
1329            "expected Failed, got {:?}",
1330            status
1331        );
1332    }
1333
1334    #[test]
1335    fn test_proc_status_to_actor_status_stopping_is_not_a_failure() {
1336        let status = proc_status_to_actor_status(Some(ProcStatus::Stopping {
1337            started_at: std::time::SystemTime::now(),
1338        }));
1339        assert!(
1340            matches!(status, ActorStatus::Stopped(ref msg) if msg.contains("stopping")),
1341            "expected Stopped, got {:?}",
1342            status
1343        );
1344    }
1345
1346    #[test]
1347    fn test_proc_status_to_actor_status_none() {
1348        let status = proc_status_to_actor_status(None);
1349        assert!(
1350            matches!(status, ActorStatus::Stopped(_)),
1351            "expected Stopped, got {:?}",
1352            status
1353        );
1354    }
1355
1356    #[test]
1357    fn test_proc_status_to_actor_status_killed() {
1358        let status = proc_status_to_actor_status(Some(ProcStatus::Killed {
1359            signal: 9,
1360            core_dumped: false,
1361        }));
1362        assert!(
1363            matches!(status, ActorStatus::Failed(_)),
1364            "expected Failed, got {:?}",
1365            status
1366        );
1367    }
1368
1369    #[test]
1370    fn test_proc_status_to_actor_status_failed() {
1371        let status = proc_status_to_actor_status(Some(ProcStatus::Failed {
1372            reason: "oom".to_string(),
1373        }));
1374        assert!(
1375            matches!(status, ActorStatus::Failed(_)),
1376            "expected Failed, got {:?}",
1377            status
1378        );
1379    }
1380}