hyperactor_mesh/alloc/
sim.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Support for allocating procs in the local process with simulated channels.
10
11#![allow(dead_code)] // until it is used outside of testing
12
13use std::collections::HashMap;
14
15use async_trait::async_trait;
16use hyperactor::ProcId;
17use hyperactor::WorldId;
18use hyperactor::channel::ChannelAddr;
19use hyperactor::channel::ChannelTransport;
20use hyperactor::mailbox::MailboxServerHandle;
21use hyperactor::proc::Proc;
22use ndslice::Point;
23use ndslice::view::Extent;
24
25use super::ProcStopReason;
26use crate::alloc::Alloc;
27use crate::alloc::AllocSpec;
28use crate::alloc::Allocator;
29use crate::alloc::AllocatorError;
30use crate::alloc::LocalAlloc;
31use crate::alloc::ProcState;
32use crate::shortuuid::ShortUuid;
33
34/// An allocator that runs procs in the local process with network traffic going through simulated channels.
35/// Other than transport, the underlying implementation is an inner LocalAlloc.
36pub struct SimAllocator;
37
38#[async_trait]
39impl Allocator for SimAllocator {
40    type Alloc = SimAlloc;
41
42    async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError> {
43        Ok(SimAlloc::new(spec))
44    }
45}
46
47impl SimAllocator {
48    #[cfg(test)]
49    pub(crate) fn new_and_start_simnet() -> Self {
50        hyperactor::simnet::start();
51        Self
52    }
53}
54
55struct SimProc {
56    proc: Proc,
57    addr: ChannelAddr,
58    handle: MailboxServerHandle,
59}
60
61/// A simulated allocation. It is a collection of procs that are running in the local process.
62pub struct SimAlloc {
63    inner: LocalAlloc,
64    created: HashMap<ShortUuid, Point>,
65}
66
67impl SimAlloc {
68    fn new(mut spec: AllocSpec) -> Self {
69        spec.transport = ChannelTransport::Sim(Box::new(ChannelTransport::Unix));
70
71        let inner = LocalAlloc::new(spec);
72        let client_proc_id = ProcId::Ranked(WorldId(format!("{}_manager", inner.name())), 0);
73
74        let ext = inner.extent();
75
76        hyperactor::simnet::simnet_handle()
77            .expect("simnet event loop not running")
78            .register_proc(
79                client_proc_id.clone(),
80                ext.point(ext.sizes().iter().map(|_| 0).collect())
81                    .expect("should be valid point"),
82            );
83
84        Self {
85            inner,
86            created: HashMap::new(),
87        }
88    }
89    /// A chaos monkey that can be used to stop procs at random.
90    pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static {
91        self.inner.chaos_monkey()
92    }
93
94    /// A function to shut down the alloc for testing purposes.
95    pub(crate) fn stopper(&self) -> impl Fn() + 'static {
96        self.inner.stopper()
97    }
98
99    pub(crate) fn name(&self) -> &ShortUuid {
100        self.inner.name()
101    }
102
103    fn size(&self) -> usize {
104        self.inner.size()
105    }
106}
107
108#[async_trait]
109impl Alloc for SimAlloc {
110    async fn next(&mut self) -> Option<ProcState> {
111        let proc_state = self.inner.next().await?;
112        match &proc_state {
113            ProcState::Created {
114                create_key, point, ..
115            } => {
116                self.created.insert(create_key.clone(), point.clone());
117            }
118            ProcState::Running {
119                create_key,
120                proc_id,
121                ..
122            } => {
123                hyperactor::simnet::simnet_handle()
124                    .expect("simnet event loop not running")
125                    .register_proc(
126                        proc_id.clone(),
127                        self.created
128                            .remove(create_key)
129                            .expect("have point for create key"),
130                    );
131            }
132            _ => (),
133        }
134        Some(proc_state)
135    }
136
137    fn spec(&self) -> &AllocSpec {
138        self.inner.spec()
139    }
140
141    fn extent(&self) -> &Extent {
142        self.inner.extent()
143    }
144
145    fn world_id(&self) -> &WorldId {
146        self.inner.world_id()
147    }
148
149    async fn stop(&mut self) -> Result<(), AllocatorError> {
150        self.inner.stop().await
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use std::collections::HashMap;
157
158    use hyperactor::simnet::BetaDistribution;
159    use hyperactor::simnet::LatencyConfig;
160    use hyperactor::simnet::LatencyDistribution;
161    use ndslice::extent;
162
163    use super::*;
164    use crate::ProcMesh;
165    use crate::RootActorMesh;
166    use crate::actor_mesh::ActorMesh;
167    use crate::alloc::AllocConstraints;
168    use crate::alloc::test_utils::TestActor;
169
170    #[tokio::test]
171    async fn test_allocator_basic() {
172        hyperactor::simnet::start();
173        crate::alloc::testing::test_allocator_basic(SimAllocator).await;
174    }
175
176    #[tokio::test]
177    async fn test_allocator_registers_resources() {
178        hyperactor::simnet::start_with_config(LatencyConfig {
179            inter_zone_distribution: LatencyDistribution::Beta(
180                BetaDistribution::new(
181                    tokio::time::Duration::from_millis(999),
182                    tokio::time::Duration::from_millis(999),
183                    1.0,
184                    1.0,
185                )
186                .unwrap(),
187            ),
188            ..Default::default()
189        });
190
191        let alloc = SimAllocator
192            .allocate(AllocSpec {
193                extent: extent!(region = 1, dc = 1, zone = 10, rack = 1, host = 1, gpu = 1),
194                constraints: AllocConstraints {
195                    match_labels: HashMap::new(),
196                },
197                proc_name: None,
198                transport: ChannelTransport::Unix,
199            })
200            .await
201            .unwrap();
202
203        let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
204
205        let handle = hyperactor::simnet::simnet_handle().unwrap();
206        let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn("echo", &()).await.unwrap();
207        let actors = actor_mesh.iter_actor_refs().collect::<Vec<_>>();
208        assert_eq!(
209            handle.sample_latency(
210                actors[0].actor_id().proc_id(),
211                actors[1].actor_id().proc_id()
212            ),
213            tokio::time::Duration::from_millis(999)
214        );
215        assert_eq!(
216            handle.sample_latency(
217                actors[2].actor_id().proc_id(),
218                actors[9].actor_id().proc_id()
219            ),
220            tokio::time::Duration::from_millis(999)
221        );
222        assert_eq!(
223            handle.sample_latency(
224                proc_mesh.client().self_id().proc_id(),
225                actors[1].actor_id().proc_id()
226            ),
227            tokio::time::Duration::from_millis(999)
228        );
229    }
230}