hyperactor_mesh/alloc/
sim.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Support for allocating procs in the local process with simulated channels.
10
11#![allow(dead_code)] // until it is used outside of testing
12
13use async_trait::async_trait;
14use hyperactor::ProcId;
15use hyperactor::WorldId;
16use hyperactor::channel::ChannelAddr;
17use hyperactor::channel::ChannelTransport;
18use hyperactor::mailbox::MailboxServerHandle;
19use hyperactor::proc::Proc;
20use ndslice::view::Extent;
21
22use super::ProcStopReason;
23use crate::alloc::Alloc;
24use crate::alloc::AllocSpec;
25use crate::alloc::Allocator;
26use crate::alloc::AllocatorError;
27use crate::alloc::LocalAlloc;
28use crate::alloc::ProcState;
29use crate::shortuuid::ShortUuid;
30
31/// An allocator that runs procs in the local process with network traffic going through simulated channels.
32/// Other than transport, the underlying implementation is an inner LocalAlloc.
33pub struct SimAllocator;
34
35#[async_trait]
36impl Allocator for SimAllocator {
37    type Alloc = SimAlloc;
38
39    async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError> {
40        Ok(SimAlloc::new(spec))
41    }
42}
43
44impl SimAllocator {
45    #[cfg(test)]
46    pub(crate) fn new_and_start_simnet() -> Self {
47        hyperactor::simnet::start();
48        Self
49    }
50}
51
52struct SimProc {
53    proc: Proc,
54    addr: ChannelAddr,
55    handle: MailboxServerHandle,
56}
57
58/// A simulated allocation. It is a collection of procs that are running in the local process.
59pub struct SimAlloc {
60    inner: LocalAlloc,
61}
62
63impl SimAlloc {
64    fn new(spec: AllocSpec) -> Self {
65        let inner = LocalAlloc::new_with_transport(
66            spec,
67            ChannelTransport::Sim(Box::new(ChannelTransport::Unix)),
68        );
69        let client_proc_id = ProcId::Ranked(WorldId(format!("{}_manager", inner.name())), 0);
70
71        let ext = inner.extent();
72
73        hyperactor::simnet::simnet_handle()
74            .expect("simnet event loop not running")
75            .register_proc(
76                client_proc_id.clone(),
77                ext.point(ext.sizes().iter().map(|_| 0).collect())
78                    .expect("should be valid point"),
79            );
80
81        Self { inner }
82    }
83    /// A chaos monkey that can be used to stop procs at random.
84    pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static {
85        self.inner.chaos_monkey()
86    }
87
88    /// A function to shut down the alloc for testing purposes.
89    pub(crate) fn stopper(&self) -> impl Fn() + 'static {
90        self.inner.stopper()
91    }
92
93    pub(crate) fn name(&self) -> &ShortUuid {
94        self.inner.name()
95    }
96
97    fn size(&self) -> usize {
98        self.inner.size()
99    }
100}
101
102#[async_trait]
103impl Alloc for SimAlloc {
104    async fn next(&mut self) -> Option<ProcState> {
105        let proc_state = self.inner.next().await;
106        if let Some(ProcState::Created { proc_id, point, .. }) = &proc_state {
107            hyperactor::simnet::simnet_handle()
108                .expect("simnet event loop not running")
109                .register_proc(proc_id.clone(), point.clone());
110        }
111        proc_state
112    }
113
114    fn extent(&self) -> &Extent {
115        self.inner.extent()
116    }
117
118    fn world_id(&self) -> &WorldId {
119        self.inner.world_id()
120    }
121
122    fn transport(&self) -> ChannelTransport {
123        self.inner.transport()
124    }
125
126    async fn stop(&mut self) -> Result<(), AllocatorError> {
127        self.inner.stop().await
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use std::collections::HashMap;
134
135    use hyperactor::simnet::BetaDistribution;
136    use hyperactor::simnet::LatencyConfig;
137    use hyperactor::simnet::LatencyDistribution;
138    use ndslice::extent;
139
140    use super::*;
141    use crate::ProcMesh;
142    use crate::RootActorMesh;
143    use crate::actor_mesh::ActorMesh;
144    use crate::alloc::AllocConstraints;
145    use crate::alloc::test_utils::TestActor;
146
147    #[tokio::test]
148    async fn test_allocator_basic() {
149        hyperactor::simnet::start();
150        crate::alloc::testing::test_allocator_basic(SimAllocator).await;
151    }
152
153    #[tokio::test]
154    async fn test_allocator_registers_resources() {
155        hyperactor::simnet::start_with_config(LatencyConfig {
156            inter_zone_distribution: LatencyDistribution::Beta(
157                BetaDistribution::new(
158                    tokio::time::Duration::from_millis(999),
159                    tokio::time::Duration::from_millis(999),
160                    1.0,
161                    1.0,
162                )
163                .unwrap(),
164            ),
165            ..Default::default()
166        });
167
168        let alloc = SimAllocator
169            .allocate(AllocSpec {
170                extent: extent!(region = 1, dc = 1, zone = 10, rack = 1, host = 1, gpu = 1),
171                constraints: AllocConstraints {
172                    match_labels: HashMap::new(),
173                },
174            })
175            .await
176            .unwrap();
177
178        let proc_mesh = ProcMesh::allocate(alloc).await.unwrap();
179
180        let handle = hyperactor::simnet::simnet_handle().unwrap();
181        let actor_mesh: RootActorMesh<TestActor> = proc_mesh.spawn("echo", &()).await.unwrap();
182        let actors = actor_mesh.iter_actor_refs().collect::<Vec<_>>();
183        assert_eq!(
184            handle.sample_latency(
185                actors[0].actor_id().proc_id(),
186                actors[1].actor_id().proc_id()
187            ),
188            tokio::time::Duration::from_millis(999)
189        );
190        assert_eq!(
191            handle.sample_latency(
192                actors[2].actor_id().proc_id(),
193                actors[9].actor_id().proc_id()
194            ),
195            tokio::time::Duration::from_millis(999)
196        );
197        assert_eq!(
198            handle.sample_latency(
199                proc_mesh.client().actor_id().proc_id(),
200                actors[1].actor_id().proc_id()
201            ),
202            tokio::time::Duration::from_millis(999)
203        );
204    }
205}