hyperactor_mesh/v1/
mesh_controller.rs1use std::fmt::Debug;
10
11use async_trait::async_trait;
12use hyperactor::Actor;
13use hyperactor::Instance;
14use hyperactor::ProcId;
15use hyperactor::actor::ActorError;
16use hyperactor::actor::Referable;
17use ndslice::ViewExt;
18use ndslice::view::Ranked;
19
20use crate::v1::actor_mesh::ActorMeshRef;
21use crate::v1::host_mesh::HostMeshRef;
22use crate::v1::proc_mesh::ProcMeshRef;
23
24#[hyperactor::export(spawn = false)]
25pub(crate) struct ActorMeshController<A>
26where
27 A: Referable,
28{
29 mesh: ActorMeshRef<A>,
30}
31
32impl<A: Referable> Debug for ActorMeshController<A> {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 f.debug_struct("MeshController")
35 .field("mesh", &self.mesh)
36 .finish()
37 }
38}
39
40#[async_trait]
41impl<A: Referable> Actor for ActorMeshController<A> {
42 type Params = ActorMeshRef<A>;
43 async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
44 Ok(Self { mesh: params })
45 }
46
47 async fn cleanup(
48 &mut self,
49 this: &Instance<Self>,
50 _err: Option<&ActorError>,
51 ) -> Result<(), anyhow::Error> {
52 self.mesh
54 .proc_mesh()
55 .stop_actor_by_name(this, self.mesh.name().clone())
56 .await?;
57 Ok(())
58 }
59}
60
61#[derive(Debug)]
62#[hyperactor::export(spawn = true)]
63pub(crate) struct ProcMeshController {
64 mesh: ProcMeshRef,
65}
66
67#[async_trait]
68impl Actor for ProcMeshController {
69 type Params = ProcMeshRef;
70 async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
71 Ok(Self { mesh: params })
72 }
73
74 async fn cleanup(
75 &mut self,
76 this: &Instance<Self>,
77 _err: Option<&ActorError>,
78 ) -> Result<(), anyhow::Error> {
79 let names = self.mesh.proc_ids().collect::<Vec<ProcId>>();
81 let region = self.mesh.region().clone();
82 if let Some(hosts) = self.mesh.hosts() {
83 hosts
84 .stop_proc_mesh(this, self.mesh.name(), names, region)
85 .await
86 } else {
87 Ok(())
88 }
89 }
90}
91
92#[derive(Debug)]
93#[hyperactor::export(spawn = true)]
94pub(crate) struct HostMeshController {
95 mesh: HostMeshRef,
96}
97
98#[async_trait]
99impl Actor for HostMeshController {
100 type Params = HostMeshRef;
101 async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
102 Ok(Self { mesh: params })
103 }
104
105 async fn cleanup(
106 &mut self,
107 this: &Instance<Self>,
108 _err: Option<&ActorError>,
109 ) -> Result<(), anyhow::Error> {
110 for host in self.mesh.values() {
112 if let Err(e) = host.shutdown(this).await {
113 tracing::warn!(host = %host, error = %e, "host shutdown failed");
114 }
115 }
116 Ok(())
117 }
118}