monarch_hyperactor/
alloc.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::collections::HashMap;
10use std::str::FromStr;
11use std::sync::Arc;
12
13use anyhow::anyhow;
14use async_trait::async_trait;
15use hyperactor::channel::ChannelAddr;
16use hyperactor::channel::ChannelTransport;
17use hyperactor::channel::TlsAddr;
18use hyperactor_mesh::alloc::Alloc;
19use hyperactor_mesh::alloc::AllocConstraints;
20use hyperactor_mesh::alloc::AllocName;
21use hyperactor_mesh::alloc::AllocSpec;
22use hyperactor_mesh::alloc::Allocator;
23use hyperactor_mesh::alloc::AllocatorError;
24use hyperactor_mesh::alloc::LocalAllocator;
25use hyperactor_mesh::alloc::ProcState;
26use hyperactor_mesh::alloc::ProcessAllocator;
27use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAlloc;
28use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocHost;
29use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocInitializer;
30use hyperactor_mesh::bootstrap::BootstrapCommand;
31use hyperactor_mesh::transport::default_transport;
32use ndslice::Extent;
33use pyo3::exceptions::PyRuntimeError;
34use pyo3::exceptions::PyValueError;
35use pyo3::prelude::*;
36use pyo3::types::PyDict;
37use tokio::process::Command;
38
39use crate::channel::PyChannelAddr;
40use crate::host_mesh::PyBootstrapCommand;
41use crate::pytokio::PyPythonTask;
42use crate::runtime::get_tokio_runtime;
43use crate::runtime::monarch_with_gil;
44use crate::runtime::monarch_with_gil_blocking;
45
46/// Convert a PyDict to an Extent
47fn pydict_to_extent(shape: &Bound<'_, PyDict>) -> PyResult<Extent> {
48    let mut labels = Vec::new();
49    let mut sizes = Vec::new();
50    for (key, value) in shape {
51        labels.push(key.extract::<String>()?);
52        sizes.push(value.extract::<usize>()?);
53    }
54    Ok(Extent::new(labels, sizes).unwrap())
55}
56
57/// A python class that wraps a Rust Alloc trait object. It represents what
58/// is shown on the python side. Internals are not exposed.
59/// It ensures that the Alloc is only used once (i.e. moved) in rust.
60#[pyclass(
61    name = "Alloc",
62    module = "monarch._rust_bindings.monarch_hyperactor.alloc"
63)]
64pub struct PyAlloc {
65    pub inner: Option<Box<dyn Alloc + Sync + Send>>,
66    pub bootstrap_command: Option<BootstrapCommand>,
67}
68
69struct ReshapedAlloc {
70    extent: Extent,
71    spec: AllocSpec,
72    base: Box<dyn Alloc + Sync + Send>,
73}
74
75impl ReshapedAlloc {
76    fn new(extent: Extent, base: Box<dyn Alloc + Sync + Send>) -> Self {
77        let mut spec = base.spec().clone();
78        spec.extent = extent.clone();
79        Self { extent, spec, base }
80    }
81}
82
83#[async_trait]
84impl Alloc for ReshapedAlloc {
85    async fn next(&mut self) -> Option<ProcState> {
86        self.base.next().await
87    }
88
89    fn extent(&self) -> &Extent {
90        &self.extent
91    }
92
93    fn spec(&self) -> &AllocSpec {
94        &self.spec
95    }
96
97    fn alloc_name(&self) -> &AllocName {
98        self.base.alloc_name()
99    }
100
101    async fn stop(&mut self) -> Result<(), AllocatorError> {
102        self.base.stop().await
103    }
104}
105
106impl PyAlloc {
107    /// Create a new PyAlloc with provided boxed trait.
108    pub fn new(
109        inner: Box<dyn Alloc + Sync + Send>,
110        bootstrap_command: Option<BootstrapCommand>,
111    ) -> Self {
112        Self {
113            inner: Some(inner),
114            bootstrap_command,
115        }
116    }
117
118    /// Take the internal Alloc object.
119    pub fn take(&mut self) -> Option<Box<dyn Alloc + Sync + Send>> {
120        self.inner.take()
121    }
122}
123
124#[pymethods]
125impl PyAlloc {
126    fn __repr__(&self) -> PyResult<String> {
127        match &self.inner {
128            None => Ok("Alloc(None)".to_string()),
129            Some(wrapper) => Ok(format!("Alloc({})", wrapper.shape())),
130        }
131    }
132    pub fn reshape(&mut self, shape: &Bound<'_, PyDict>) -> PyResult<Option<PyAlloc>> {
133        let alloc = self.take();
134        alloc
135            .map(|alloc| {
136                let extent = alloc.extent();
137                let old_num_elements = extent.num_ranks();
138
139                // Create extent from the PyDict
140                let new_extent = pydict_to_extent(shape)?;
141
142                let new_elements = new_extent.num_ranks();
143                if old_num_elements != new_elements {
144                    return Err(PyErr::new::<PyValueError, _>(format!(
145                        "cannot reshape {} != {}",
146                        old_num_elements, new_elements
147                    )));
148                }
149                Ok(PyAlloc::new(
150                    Box::new(ReshapedAlloc::new(new_extent, alloc)),
151                    self.bootstrap_command.clone(),
152                ))
153            })
154            .transpose()
155    }
156}
157
158#[pyclass(
159    name = "AllocConstraints",
160    module = "monarch._rust_bindings.monarch_hyperactor.alloc"
161)]
162pub struct PyAllocConstraints {
163    inner: AllocConstraints,
164}
165
166#[pymethods]
167impl PyAllocConstraints {
168    #[new]
169    #[pyo3(signature = (match_labels=None))]
170    fn new(match_labels: Option<HashMap<String, String>>) -> PyResult<Self> {
171        let mut constraints = AllocConstraints::default();
172        if let Some(match_lables) = match_labels {
173            constraints.match_labels = match_lables;
174        }
175
176        Ok(Self { inner: constraints })
177    }
178
179    #[getter]
180    fn match_labels(&self) -> PyResult<HashMap<String, String>> {
181        Ok(self.inner.match_labels.clone())
182    }
183}
184
185#[pyclass(
186    name = "AllocSpec",
187    module = "monarch._rust_bindings.monarch_hyperactor.alloc"
188)]
189pub struct PyAllocSpec {
190    inner: AllocSpec,
191    // When this PyAllocSpec is converted to AllocSpec, if this
192    // field does not have a value, then the returned AllocSpec
193    // will be a clone of `inner` with the transport field set
194    // to the current default transport. If the field does have
195    // a value, the returned AllocSpec will be a clone of `inner`
196    // with the transport field set to this value.
197    transport: Option<ChannelTransport>,
198}
199
200#[pymethods]
201impl PyAllocSpec {
202    #[new]
203    #[pyo3(signature = (constraints, **kwargs))]
204    fn new(constraints: &PyAllocConstraints, kwargs: Option<&Bound<'_, PyDict>>) -> PyResult<Self> {
205        let mut keys = Vec::new();
206        let mut values = Vec::new();
207
208        if let Some(kwargs) = kwargs {
209            for (key, value) in kwargs {
210                keys.push(key.clone());
211                values.push(value.clone());
212            }
213        };
214
215        let extent = Extent::new(
216            keys.into_iter()
217                .map(|key| key.extract::<String>())
218                .collect::<PyResult<Vec<String>>>()?,
219            values
220                .into_iter()
221                .map(|key| key.extract::<usize>())
222                .collect::<PyResult<Vec<usize>>>()?,
223        )
224        .map_err(|e| PyValueError::new_err(format!("Invalid extent: {:?}", e)))?;
225
226        Ok(Self {
227            inner: AllocSpec {
228                extent,
229                constraints: constraints.inner.clone(),
230                proc_name: None,
231                transport: default_transport(),
232                proc_allocation_mode: Default::default(),
233            },
234            transport: None,
235        })
236    }
237
238    #[getter]
239    fn extent<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
240        let d = PyDict::new(py);
241        for (name, size) in self.inner.extent.iter() {
242            d.set_item(name, size)?;
243        }
244        Ok(d)
245    }
246
247    #[getter]
248    fn constraints(&self) -> PyResult<PyAllocConstraints> {
249        Ok(PyAllocConstraints {
250            inner: self.inner.constraints.clone(),
251        })
252    }
253}
254
255impl From<&PyAllocSpec> for AllocSpec {
256    fn from(spec: &PyAllocSpec) -> Self {
257        let mut inner = spec.inner.clone();
258        inner.transport = spec.transport.clone().unwrap_or_else(default_transport);
259        inner
260    }
261}
262
263#[pyclass(
264    name = "LocalAllocatorBase",
265    module = "monarch._rust_bindings.monarch_hyperactor.alloc",
266    subclass
267)]
268pub struct PyLocalAllocator;
269
270#[pymethods]
271impl PyLocalAllocator {
272    #[new]
273    fn new() -> Self {
274        PyLocalAllocator {}
275    }
276
277    fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
278        // We could use Bound here, and acquire the GIL inside of `future_into_py`, but
279        // it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
280        // pretty easily.
281        let spec = spec.into();
282        PyPythonTask::new(async move {
283            LocalAllocator
284                .allocate(spec)
285                .await
286                .map(|inner| PyAlloc::new(Box::new(inner), None))
287                .map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
288        })
289    }
290}
291
292#[pyclass(
293    name = "ProcessAllocatorBase",
294    module = "monarch._rust_bindings.monarch_hyperactor.alloc",
295    subclass
296)]
297pub struct PyProcessAllocator {
298    inner: Arc<tokio::sync::Mutex<ProcessAllocator>>,
299}
300
301#[pymethods]
302impl PyProcessAllocator {
303    #[new]
304    #[pyo3(signature = (cmd, args=None, env=None))]
305    fn new(cmd: String, args: Option<Vec<String>>, env: Option<HashMap<String, String>>) -> Self {
306        let mut cmd = Command::new(cmd);
307        if let Some(args) = args {
308            cmd.args(args);
309        }
310        if let Some(env) = env {
311            cmd.envs(env);
312        }
313        Self {
314            inner: Arc::new(tokio::sync::Mutex::new(ProcessAllocator::new(cmd))),
315        }
316    }
317
318    fn allocate_nonblocking(&self, py: Python<'_>, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
319        // We could use Bound here, and acquire the GIL inside of `future_into_py`, but
320        // it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
321        // pretty easily.
322        let instance = Arc::clone(&self.inner);
323        let spec = spec.into();
324        let bootstrap_command = PyBootstrapCommand::default(py)?.borrow().to_rust();
325        PyPythonTask::new(async move {
326            instance
327                .lock()
328                .await
329                .allocate(spec)
330                .await
331                .map(|inner| PyAlloc::new(Box::new(inner), Some(bootstrap_command)))
332                .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
333        })
334    }
335}
336
337/// A `[hyperactor_mesh::alloc::RemoteProcessAllocInitializer]` wrapper to enable subclassing from Python.
338///
339/// Basically follows https://pyo3.rs/v0.25.0/trait-bounds.html.
340/// The Python subclass should implement `def initialize_alloc(self) -> list[str]`.
341pub struct PyRemoteProcessAllocInitializer {
342    // instance of a Python subclass of `monarch._rust_bindings.monarch_hyperactor.alloc.RemoteProcessAllocInitializer`.
343    py_inner: Py<PyAny>,
344
345    // allocation constraints passed onto the allocator's allocate call and passed along to python initializer.
346    constraints: AllocConstraints,
347}
348
349impl PyRemoteProcessAllocInitializer {
350    /// calls the initializer's `initialize_alloc()` as implemented in python
351    ///
352    /// NOTE: changes to python method calls must be made in sync with
353    ///   the method signature of `RemoteAllocInitializer` in
354    ///   `monarch/python/monarch/_rust_bindings/monarch_hyperactor/alloc.pyi`
355    async fn py_initialize_alloc(&self) -> PyResult<Vec<String>> {
356        let args = (&self.constraints.match_labels,);
357        let coro = monarch_with_gil(|py| -> PyResult<Py<PyAny>> {
358            self.py_inner
359                .bind(py)
360                .call_method1("initialize_alloc", args)
361                .map(|x| x.unbind())
362        })
363        .await?;
364        let r = get_tokio_runtime().spawn_blocking(move || -> PyResult<Vec<String>> {
365            // call the function as implemented in python
366            monarch_with_gil_blocking(|py| {
367                let asyncio = py.import("asyncio").unwrap();
368                let addrs = asyncio.call_method1("run", (coro,))?;
369                let addrs: PyResult<Vec<String>> = addrs.extract();
370                addrs
371            })
372        });
373
374        r.await
375            .map_err(|err| PyRuntimeError::new_err(err.to_string()))?
376    }
377
378    async fn get_transport_and_port(&self) -> PyResult<(ChannelTransport, u16)> {
379        // NOTE: the upstream RemoteAllocator APIs take (transport, port, hostnames)
380        //   (e.g. assumes the same transport and port for all servers).
381        //   Until that is fixed we have to assume the same here.
382        //   Get the transport and port from the first address
383        // TODO T227130269
384        let addrs = self.py_initialize_alloc().await?;
385        let addr = addrs
386            .first()
387            .ok_or_else(|| anyhow!("initializer must return non-empty list of addresses"))?;
388        let channel_addr = PyChannelAddr::parse(addr)?;
389        let port = channel_addr.get_port()?;
390        let transport = channel_addr.get_transport()?;
391        Ok((transport.into(), port))
392    }
393}
394
395#[async_trait]
396impl RemoteProcessAllocInitializer for PyRemoteProcessAllocInitializer {
397    async fn initialize_alloc(&mut self) -> Result<Vec<RemoteProcessAllocHost>, anyhow::Error> {
398        // call the function as implemented in python
399        let addrs = self.py_initialize_alloc().await?;
400        addrs
401            .iter()
402            .map(|channel_addr| {
403                let addr = ChannelAddr::from_str(channel_addr)?;
404                let (id, hostname) = match addr {
405                    ChannelAddr::Tcp(socket) => {
406                        if socket.is_ipv6() {
407                            // ipv6 addresses need to be wrapped in square-brackets [ipv6_addr]
408                            // since the return value here gets concatenated with 'port' to make up a sockaddr
409                            let ipv6_addr = format!("[{}]", socket.ip());
410                            (ipv6_addr.clone(), ipv6_addr.clone())
411                        } else {
412                            let ipv4_addr = socket.ip().to_string();
413                            (ipv4_addr.clone(), ipv4_addr.clone())
414                        }
415                    }
416                    ChannelAddr::MetaTls(TlsAddr { hostname, .. }) => {
417                        (hostname.clone(), hostname.clone())
418                    }
419                    ChannelAddr::Unix(_) => (addr.to_string(), addr.to_string()),
420                    _ => anyhow::bail!("unsupported transport for channel address: `{addr}`"),
421                };
422                Ok(RemoteProcessAllocHost { id, hostname })
423            })
424            .collect()
425    }
426}
427
428#[pyclass(
429    name = "RemoteAllocatorBase",
430    module = "monarch._rust_bindings.monarch_hyperactor.alloc",
431    subclass
432)]
433pub struct PyRemoteAllocator {
434    alloc_name: String,
435    initializer: Py<PyAny>,
436}
437
438impl Clone for PyRemoteAllocator {
439    fn clone(&self) -> Self {
440        Self {
441            alloc_name: self.alloc_name.clone(),
442            initializer: monarch_with_gil_blocking(|py| Py::clone_ref(&self.initializer, py)),
443        }
444    }
445}
446#[async_trait]
447impl Allocator for PyRemoteAllocator {
448    type Alloc = RemoteProcessAlloc;
449
450    async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError> {
451        let py_inner = monarch_with_gil(|py| Py::clone_ref(&self.initializer, py)).await;
452        let constraints = spec.constraints.clone();
453        let initializer = PyRemoteProcessAllocInitializer {
454            py_inner,
455            constraints,
456        };
457
458        let (transport, port) = initializer
459            .get_transport_and_port()
460            .await
461            .map_err(|e| AllocatorError::Other(e.into()))?;
462
463        let mut spec = spec;
464        if spec.transport != transport {
465            monarch_with_gil(|py| -> PyResult<()> {
466                // TODO(slurye): Temporary until we start enforcing that people have properly
467                // configured the default transport.
468                py.import("warnings")?.getattr("warn")?.call1((format!(
469                    "The AllocSpec passed to RemoteAllocator.allocate has transport {}, \
470                        but the transport from the remote process alloc initializer is {}. \
471                        This will soon be an error unless you explicitly configure monarch's \
472                        default transport to {}. The current default transport is {}.",
473                    spec.transport,
474                    transport,
475                    transport,
476                    default_transport()
477                ),))?;
478                spec.transport = transport;
479                Ok(())
480            })
481            .await
482            .map_err(|e| AllocatorError::Other(e.into()))?;
483        }
484
485        let alloc =
486            RemoteProcessAlloc::new(spec, AllocName(self.alloc_name.clone()), port, initializer)
487                .await
488                .map_err(|e| {
489                    tracing::error!("failed to allocate: {e:?}");
490                    e
491                })?;
492        Ok(alloc)
493    }
494}
495
496#[pymethods]
497impl PyRemoteAllocator {
498    #[new]
499    #[pyo3(signature = (
500        alloc_name,
501        initializer,
502    ))]
503    fn new(alloc_name: String, initializer: Py<PyAny>) -> PyResult<Self> {
504        Ok(Self {
505            alloc_name,
506            initializer,
507        })
508    }
509
510    fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
511        let spec = spec.into();
512        let mut cloned = self.clone();
513
514        PyPythonTask::new(async move {
515            cloned
516                .allocate(spec)
517                .await
518                .map(|alloc| PyAlloc::new(Box::new(alloc), None))
519                .map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
520        })
521    }
522}
523
524pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
525    hyperactor_mod.add_class::<PyAlloc>()?;
526    hyperactor_mod.add_class::<PyAllocConstraints>()?;
527    hyperactor_mod.add_class::<PyAllocSpec>()?;
528    hyperactor_mod.add_class::<PyProcessAllocator>()?;
529    hyperactor_mod.add_class::<PyLocalAllocator>()?;
530    hyperactor_mod.add_class::<PyRemoteAllocator>()?;
531
532    Ok(())
533}