hyperactor_mesh/
supervision.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Messages used in supervision of actor meshes.
10
11use hyperactor::Bind;
12use hyperactor::Unbind;
13use hyperactor::actor::ActorErrorKind;
14use hyperactor::actor::ActorStatus;
15use hyperactor::context;
16use hyperactor::supervision::ActorSupervisionEvent;
17use serde::Deserialize;
18use serde::Serialize;
19use typeuri::Named;
20
21/// Message about a supervision failure on a mesh of actors instead of a single
22/// actor.
23#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq, Bind, Unbind)]
24pub struct MeshFailure {
25    /// Name of the mesh which the event originated from.
26    pub actor_mesh_name: Option<String>,
27    /// The supervision event on an actor located at mesh + rank.
28    pub event: ActorSupervisionEvent,
29    /// The set of crashed ranks in the mesh. Empty means the event
30    /// applies to the whole mesh (e.g. mesh stop, controller timeout).
31    pub crashed_ranks: Vec<usize>,
32}
33wirevalue::register_type!(MeshFailure);
34
35impl MeshFailure {
36    /// Returns true if the given rank is part of this failure.
37    /// A whole-mesh event (empty crashed_ranks) contains every rank.
38    pub fn contains_rank(&self, rank: usize) -> bool {
39        self.crashed_ranks.is_empty() || self.crashed_ranks.contains(&rank)
40    }
41
42    /// Helper function to handle a message to an actor that just wants to forward
43    /// it to the next owner.
44    pub fn default_handler(&self, cx: &impl context::Actor) -> Result<(), anyhow::Error> {
45        // If an actor spawned by this one fails, we can't handle it. We fail
46        // ourselves with a chained error and bubble up to the next owner.
47        let err = ActorErrorKind::UnhandledSupervisionEvent(Box::new(ActorSupervisionEvent::new(
48            cx.instance().self_id().clone(),
49            None,
50            ActorStatus::Failed(ActorErrorKind::UnhandledSupervisionEvent(Box::new(
51                self.event.clone(),
52            ))),
53            None,
54        )));
55        Err(anyhow::Error::new(err))
56    }
57}
58
59impl std::fmt::Display for MeshFailure {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        let actor_mesh_name = self
62            .actor_mesh_name
63            .as_ref()
64            .map(|m| format!(" on mesh \"{}\"", m))
65            .unwrap_or("".to_string());
66        let ranks = if self.crashed_ranks.is_empty() {
67            String::new()
68        } else {
69            format!(" at ranks {:?}", self.crashed_ranks)
70        };
71        write!(
72            f,
73            "failure{}{} with event: {}",
74            actor_mesh_name, ranks, self.event
75        )
76    }
77}
78
79// Shared between mesh types.
80#[derive(Debug, Clone)]
81pub(crate) enum Unhealthy {
82    StreamClosed(MeshFailure), // Event stream closed
83    Crashed(MeshFailure),      // Bad health event received
84}