hyperactor_mesh/v1/
mesh_controller.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::fmt::Debug;
10
11use async_trait::async_trait;
12use hyperactor::Actor;
13use hyperactor::Instance;
14use hyperactor::ProcId;
15use hyperactor::actor::ActorError;
16use hyperactor::actor::Referable;
17use ndslice::ViewExt;
18use ndslice::view::Ranked;
19
20use crate::v1::actor_mesh::ActorMeshRef;
21use crate::v1::host_mesh::HostMeshRef;
22use crate::v1::proc_mesh::ProcMeshRef;
23
24#[hyperactor::export(spawn = false)]
25pub(crate) struct ActorMeshController<A>
26where
27    A: Referable,
28{
29    mesh: ActorMeshRef<A>,
30}
31
32impl<A: Referable> Debug for ActorMeshController<A> {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        f.debug_struct("MeshController")
35            .field("mesh", &self.mesh)
36            .finish()
37    }
38}
39
40#[async_trait]
41impl<A: Referable> Actor for ActorMeshController<A> {
42    type Params = ActorMeshRef<A>;
43    async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
44        Ok(Self { mesh: params })
45    }
46
47    async fn cleanup(
48        &mut self,
49        this: &Instance<Self>,
50        _err: Option<&ActorError>,
51    ) -> Result<(), anyhow::Error> {
52        // Cannot use "ActorMesh::stop" as it's only defined on ActorMesh, not ActorMeshRef.
53        self.mesh
54            .proc_mesh()
55            .stop_actor_by_name(this, self.mesh.name().clone())
56            .await?;
57        Ok(())
58    }
59}
60
61#[derive(Debug)]
62#[hyperactor::export(spawn = true)]
63pub(crate) struct ProcMeshController {
64    mesh: ProcMeshRef,
65}
66
67#[async_trait]
68impl Actor for ProcMeshController {
69    type Params = ProcMeshRef;
70    async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
71        Ok(Self { mesh: params })
72    }
73
74    async fn cleanup(
75        &mut self,
76        this: &Instance<Self>,
77        _err: Option<&ActorError>,
78    ) -> Result<(), anyhow::Error> {
79        // Cannot use "ProcMesh::stop" as it's only defined on ProcMesh, not ProcMeshRef.
80        let names = self.mesh.proc_ids().collect::<Vec<ProcId>>();
81        let region = self.mesh.region().clone();
82        if let Some(hosts) = self.mesh.hosts() {
83            hosts
84                .stop_proc_mesh(this, self.mesh.name(), names, region)
85                .await
86        } else {
87            Ok(())
88        }
89    }
90}
91
92#[derive(Debug)]
93#[hyperactor::export(spawn = true)]
94pub(crate) struct HostMeshController {
95    mesh: HostMeshRef,
96}
97
98#[async_trait]
99impl Actor for HostMeshController {
100    type Params = HostMeshRef;
101    async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
102        Ok(Self { mesh: params })
103    }
104
105    async fn cleanup(
106        &mut self,
107        this: &Instance<Self>,
108        _err: Option<&ActorError>,
109    ) -> Result<(), anyhow::Error> {
110        // Cannot use "HostMesh::shutdown" as it's only defined on HostMesh, not HostMeshRef.
111        for host in self.mesh.values() {
112            if let Err(e) = host.shutdown(this).await {
113                tracing::warn!(host = %host, error = %e, "host shutdown failed");
114            }
115        }
116        Ok(())
117    }
118}