hyperactor_mesh/alloc/
sim.rs1#![allow(dead_code)] use async_trait::async_trait;
14use hyperactor::ProcId;
15use hyperactor::WorldId;
16use hyperactor::channel::ChannelAddr;
17use hyperactor::channel::ChannelTransport;
18use hyperactor::mailbox::MailboxServerHandle;
19use hyperactor::proc::Proc;
20use ndslice::view::Extent;
21
22use super::ProcStopReason;
23use crate::alloc::Alloc;
24use crate::alloc::AllocSpec;
25use crate::alloc::Allocator;
26use crate::alloc::AllocatorError;
27use crate::alloc::LocalAlloc;
28use crate::alloc::ProcState;
29use crate::shortuuid::ShortUuid;
30
31pub struct SimAllocator;
34
35#[async_trait]
36impl Allocator for SimAllocator {
37 type Alloc = SimAlloc;
38
39 async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError> {
40 Ok(SimAlloc::new(spec))
41 }
42}
43
44impl SimAllocator {
45 #[cfg(test)]
46 pub(crate) fn new_and_start_simnet() -> Self {
47 hyperactor::simnet::start();
48 Self
49 }
50}
51
52struct SimProc {
53 proc: Proc,
54 addr: ChannelAddr,
55 handle: MailboxServerHandle,
56}
57
58pub struct SimAlloc {
60 inner: LocalAlloc,
61}
62
63impl SimAlloc {
64 fn new(spec: AllocSpec) -> Self {
65 let inner = LocalAlloc::new_with_transport(
66 spec,
67 ChannelTransport::Sim(Box::new(ChannelTransport::Unix)),
68 );
69 let client_proc_id = ProcId::Ranked(WorldId(format!("{}_manager", inner.name())), 0);
70
71 let ext = inner.extent();
72
73 hyperactor::simnet::simnet_handle()
74 .expect("simnet event loop not running")
75 .register_proc(
76 client_proc_id.clone(),
77 ext.point(ext.sizes().iter().map(|_| 0).collect())
78 .expect("should be valid point"),
79 );
80
81 Self { inner }
82 }
83 pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static {
85 self.inner.chaos_monkey()
86 }
87
88 pub(crate) fn stopper(&self) -> impl Fn() + 'static {
90 self.inner.stopper()
91 }
92
93 pub(crate) fn name(&self) -> &ShortUuid {
94 self.inner.name()
95 }
96
97 fn size(&self) -> usize {
98 self.inner.size()
99 }
100}
101
102#[async_trait]
103impl Alloc for SimAlloc {
104 async fn next(&mut self) -> Option<ProcState> {
105 let proc_state = self.inner.next().await;
106 if let Some(ProcState::Created { proc_id, point, .. }) = &proc_state {
107 hyperactor::simnet::simnet_handle()
108 .expect("simnet event loop not running")
109 .register_proc(proc_id.clone(), point.clone());
110 }
111 proc_state
112 }
113
114 fn extent(&self) -> &Extent {
115 self.inner.extent()
116 }
117
118 fn world_id(&self) -> &WorldId {
119 self.inner.world_id()
120 }
121
122 fn transport(&self) -> ChannelTransport {
123 self.inner.transport()
124 }
125
126 async fn stop(&mut self) -> Result<(), AllocatorError> {
127 self.inner.stop().await
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use std::collections::HashMap;
134
135 use hyperactor::simnet::BetaDistribution;
136 use hyperactor::simnet::LatencyConfig;
137 use hyperactor::simnet::LatencyDistribution;
138 use ndslice::extent;
139
140 use super::*;
141 use crate::ProcMesh;
142 use crate::RootActorMesh;
143 use crate::actor_mesh::ActorMesh;
144 use crate::alloc::AllocConstraints;
145 use crate::alloc::test_utils::TestActor;
146
147 #[tokio::test]
148 async fn test_allocator_basic() {
149 hyperactor::simnet::start();
150 crate::alloc::testing::test_allocator_basic(SimAllocator).await;
151 }
152
153 #[tokio::test]
154 async fn test_allocator_registers_resources() {
155 hyperactor::simnet::start_with_config(LatencyConfig {
156 inter_zone_distribution: LatencyDistribution::Beta(
157 BetaDistribution::new(
158 tokio::time::Duration::from_millis(999),
159 tokio::time::Duration::from_millis(999),
160 1.0,
161 1.0,
162 )
163 .unwrap(),
164 ),
165 ..Default::default()
166 });
167
168 let alloc = SimAllocator
169 .allocate(AllocSpec {
170 extent: extent!(region = 1, dc = 1, zone = 10, rack = 1, host = 1, gpu = 1),
171 constraints: AllocConstraints {
172 match_labels: HashMap::new(),
173 },
174 })
175 .await
176 .unwrap();
177
178 let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
179
180 let handle = hyperactor::simnet::simnet_handle().unwrap();
181 let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn("echo", &()).await.unwrap();
182 let actors = actor_mesh.iter_actor_refs().collect::<Vec<_>>();
183 assert_eq!(
184 handle.sample_latency(
185 actors[0].actor_id().proc_id(),
186 actors[1].actor_id().proc_id()
187 ),
188 tokio::time::Duration::from_millis(999)
189 );
190 assert_eq!(
191 handle.sample_latency(
192 actors[2].actor_id().proc_id(),
193 actors[9].actor_id().proc_id()
194 ),
195 tokio::time::Duration::from_millis(999)
196 );
197 assert_eq!(
198 handle.sample_latency(
199 proc_mesh.client().actor_id().proc_id(),
200 actors[1].actor_id().proc_id()
201 ),
202 tokio::time::Duration::from_millis(999)
203 );
204 }
205}