hyperactor_mesh/alloc/
sim.rs1#![allow(dead_code)] use std::collections::HashMap;
14
15use async_trait::async_trait;
16use hyperactor::ProcId;
17use hyperactor::WorldId;
18use hyperactor::channel::ChannelAddr;
19use hyperactor::channel::ChannelTransport;
20use hyperactor::mailbox::MailboxServerHandle;
21use hyperactor::proc::Proc;
22use ndslice::Point;
23use ndslice::view::Extent;
24
25use super::ProcStopReason;
26use crate::alloc::Alloc;
27use crate::alloc::AllocSpec;
28use crate::alloc::Allocator;
29use crate::alloc::AllocatorError;
30use crate::alloc::LocalAlloc;
31use crate::alloc::ProcState;
32use crate::shortuuid::ShortUuid;
33
34pub struct SimAllocator;
37
38#[async_trait]
39impl Allocator for SimAllocator {
40 type Alloc = SimAlloc;
41
42 async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError> {
43 Ok(SimAlloc::new(spec))
44 }
45}
46
47impl SimAllocator {
48 #[cfg(test)]
49 pub(crate) fn new_and_start_simnet() -> Self {
50 hyperactor::simnet::start();
51 Self
52 }
53}
54
55struct SimProc {
56 proc: Proc,
57 addr: ChannelAddr,
58 handle: MailboxServerHandle,
59}
60
61pub struct SimAlloc {
63 inner: LocalAlloc,
64 created: HashMap<ShortUuid, Point>,
65}
66
67impl SimAlloc {
68 fn new(mut spec: AllocSpec) -> Self {
69 spec.transport = ChannelTransport::Sim(Box::new(ChannelTransport::Unix));
70
71 let inner = LocalAlloc::new(spec);
72 let client_proc_id = ProcId::Ranked(WorldId(format!("{}_manager", inner.name())), 0);
73
74 let ext = inner.extent();
75
76 hyperactor::simnet::simnet_handle()
77 .expect("simnet event loop not running")
78 .register_proc(
79 client_proc_id.clone(),
80 ext.point(ext.sizes().iter().map(|_| 0).collect())
81 .expect("should be valid point"),
82 );
83
84 Self {
85 inner,
86 created: HashMap::new(),
87 }
88 }
89 pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static {
91 self.inner.chaos_monkey()
92 }
93
94 pub(crate) fn stopper(&self) -> impl Fn() + 'static {
96 self.inner.stopper()
97 }
98
99 pub(crate) fn name(&self) -> &ShortUuid {
100 self.inner.name()
101 }
102
103 fn size(&self) -> usize {
104 self.inner.size()
105 }
106}
107
108#[async_trait]
109impl Alloc for SimAlloc {
110 async fn next(&mut self) -> Option<ProcState> {
111 let proc_state = self.inner.next().await?;
112 match &proc_state {
113 ProcState::Created {
114 create_key, point, ..
115 } => {
116 self.created.insert(create_key.clone(), point.clone());
117 }
118 ProcState::Running {
119 create_key,
120 proc_id,
121 ..
122 } => {
123 hyperactor::simnet::simnet_handle()
124 .expect("simnet event loop not running")
125 .register_proc(
126 proc_id.clone(),
127 self.created
128 .remove(create_key)
129 .expect("have point for create key"),
130 );
131 }
132 _ => (),
133 }
134 Some(proc_state)
135 }
136
137 fn spec(&self) -> &AllocSpec {
138 self.inner.spec()
139 }
140
141 fn extent(&self) -> &Extent {
142 self.inner.extent()
143 }
144
145 fn world_id(&self) -> &WorldId {
146 self.inner.world_id()
147 }
148
149 async fn stop(&mut self) -> Result<(), AllocatorError> {
150 self.inner.stop().await
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use std::collections::HashMap;
157
158 use hyperactor::simnet::BetaDistribution;
159 use hyperactor::simnet::LatencyConfig;
160 use hyperactor::simnet::LatencyDistribution;
161 use ndslice::extent;
162
163 use super::*;
164 use crate::ProcMesh;
165 use crate::RootActorMesh;
166 use crate::actor_mesh::ActorMesh;
167 use crate::alloc::AllocConstraints;
168 use crate::alloc::test_utils::TestActor;
169
170 #[tokio::test]
171 async fn test_allocator_basic() {
172 hyperactor::simnet::start();
173 crate::alloc::testing::test_allocator_basic(SimAllocator).await;
174 }
175
176 #[tokio::test]
177 async fn test_allocator_registers_resources() {
178 hyperactor::simnet::start_with_config(LatencyConfig {
179 inter_zone_distribution: LatencyDistribution::Beta(
180 BetaDistribution::new(
181 tokio::time::Duration::from_millis(999),
182 tokio::time::Duration::from_millis(999),
183 1.0,
184 1.0,
185 )
186 .unwrap(),
187 ),
188 ..Default::default()
189 });
190
191 let alloc = SimAllocator
192 .allocate(AllocSpec {
193 extent: extent!(region = 1, dc = 1, zone = 10, rack = 1, host = 1, gpu = 1),
194 constraints: AllocConstraints {
195 match_labels: HashMap::new(),
196 },
197 proc_name: None,
198 transport: ChannelTransport::Unix,
199 })
200 .await
201 .unwrap();
202
203 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
204
205 let handle = hyperactor::simnet::simnet_handle().unwrap();
206 let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn("echo", &()).await.unwrap();
207 let actors = actor_mesh.iter_actor_refs().collect::<Vec<_>>();
208 assert_eq!(
209 handle.sample_latency(
210 actors[0].actor_id().proc_id(),
211 actors[1].actor_id().proc_id()
212 ),
213 tokio::time::Duration::from_millis(999)
214 );
215 assert_eq!(
216 handle.sample_latency(
217 actors[2].actor_id().proc_id(),
218 actors[9].actor_id().proc_id()
219 ),
220 tokio::time::Duration::from_millis(999)
221 );
222 assert_eq!(
223 handle.sample_latency(
224 proc_mesh.client().self_id().proc_id(),
225 actors[1].actor_id().proc_id()
226 ),
227 tokio::time::Duration::from_millis(999)
228 );
229 }
230}