1use std::collections::HashMap;
10use std::str::FromStr;
11use std::sync::Arc;
12
13use anyhow::anyhow;
14use async_trait::async_trait;
15use hyperactor::channel::ChannelAddr;
16use hyperactor::channel::ChannelTransport;
17use hyperactor::channel::TlsAddr;
18use hyperactor_mesh::alloc::Alloc;
19use hyperactor_mesh::alloc::AllocConstraints;
20use hyperactor_mesh::alloc::AllocName;
21use hyperactor_mesh::alloc::AllocSpec;
22use hyperactor_mesh::alloc::Allocator;
23use hyperactor_mesh::alloc::AllocatorError;
24use hyperactor_mesh::alloc::LocalAllocator;
25use hyperactor_mesh::alloc::ProcState;
26use hyperactor_mesh::alloc::ProcessAllocator;
27use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAlloc;
28use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocHost;
29use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocInitializer;
30use hyperactor_mesh::bootstrap::BootstrapCommand;
31use hyperactor_mesh::transport::default_transport;
32use ndslice::Extent;
33use pyo3::exceptions::PyRuntimeError;
34use pyo3::exceptions::PyValueError;
35use pyo3::prelude::*;
36use pyo3::types::PyDict;
37use tokio::process::Command;
38
39use crate::channel::PyChannelAddr;
40use crate::host_mesh::PyBootstrapCommand;
41use crate::pytokio::PyPythonTask;
42use crate::runtime::get_tokio_runtime;
43use crate::runtime::monarch_with_gil;
44use crate::runtime::monarch_with_gil_blocking;
45
46fn pydict_to_extent(shape: &Bound<'_, PyDict>) -> PyResult<Extent> {
48 let mut labels = Vec::new();
49 let mut sizes = Vec::new();
50 for (key, value) in shape {
51 labels.push(key.extract::<String>()?);
52 sizes.push(value.extract::<usize>()?);
53 }
54 Ok(Extent::new(labels, sizes).unwrap())
55}
56
57#[pyclass(
61 name = "Alloc",
62 module = "monarch._rust_bindings.monarch_hyperactor.alloc"
63)]
64pub struct PyAlloc {
65 pub inner: Option<Box<dyn Alloc + Sync + Send>>,
66 pub bootstrap_command: Option<BootstrapCommand>,
67}
68
69struct ReshapedAlloc {
70 extent: Extent,
71 spec: AllocSpec,
72 base: Box<dyn Alloc + Sync + Send>,
73}
74
75impl ReshapedAlloc {
76 fn new(extent: Extent, base: Box<dyn Alloc + Sync + Send>) -> Self {
77 let mut spec = base.spec().clone();
78 spec.extent = extent.clone();
79 Self { extent, spec, base }
80 }
81}
82
83#[async_trait]
84impl Alloc for ReshapedAlloc {
85 async fn next(&mut self) -> Option<ProcState> {
86 self.base.next().await
87 }
88
89 fn extent(&self) -> &Extent {
90 &self.extent
91 }
92
93 fn spec(&self) -> &AllocSpec {
94 &self.spec
95 }
96
97 fn alloc_name(&self) -> &AllocName {
98 self.base.alloc_name()
99 }
100
101 async fn stop(&mut self) -> Result<(), AllocatorError> {
102 self.base.stop().await
103 }
104}
105
106impl PyAlloc {
107 pub fn new(
109 inner: Box<dyn Alloc + Sync + Send>,
110 bootstrap_command: Option<BootstrapCommand>,
111 ) -> Self {
112 Self {
113 inner: Some(inner),
114 bootstrap_command,
115 }
116 }
117
118 pub fn take(&mut self) -> Option<Box<dyn Alloc + Sync + Send>> {
120 self.inner.take()
121 }
122}
123
124#[pymethods]
125impl PyAlloc {
126 fn __repr__(&self) -> PyResult<String> {
127 match &self.inner {
128 None => Ok("Alloc(None)".to_string()),
129 Some(wrapper) => Ok(format!("Alloc({})", wrapper.shape())),
130 }
131 }
132 pub fn reshape(&mut self, shape: &Bound<'_, PyDict>) -> PyResult<Option<PyAlloc>> {
133 let alloc = self.take();
134 alloc
135 .map(|alloc| {
136 let extent = alloc.extent();
137 let old_num_elements = extent.num_ranks();
138
139 let new_extent = pydict_to_extent(shape)?;
141
142 let new_elements = new_extent.num_ranks();
143 if old_num_elements != new_elements {
144 return Err(PyErr::new::<PyValueError, _>(format!(
145 "cannot reshape {} != {}",
146 old_num_elements, new_elements
147 )));
148 }
149 Ok(PyAlloc::new(
150 Box::new(ReshapedAlloc::new(new_extent, alloc)),
151 self.bootstrap_command.clone(),
152 ))
153 })
154 .transpose()
155 }
156}
157
158#[pyclass(
159 name = "AllocConstraints",
160 module = "monarch._rust_bindings.monarch_hyperactor.alloc"
161)]
162pub struct PyAllocConstraints {
163 inner: AllocConstraints,
164}
165
166#[pymethods]
167impl PyAllocConstraints {
168 #[new]
169 #[pyo3(signature = (match_labels=None))]
170 fn new(match_labels: Option<HashMap<String, String>>) -> PyResult<Self> {
171 let mut constraints = AllocConstraints::default();
172 if let Some(match_lables) = match_labels {
173 constraints.match_labels = match_lables;
174 }
175
176 Ok(Self { inner: constraints })
177 }
178
179 #[getter]
180 fn match_labels(&self) -> PyResult<HashMap<String, String>> {
181 Ok(self.inner.match_labels.clone())
182 }
183}
184
185#[pyclass(
186 name = "AllocSpec",
187 module = "monarch._rust_bindings.monarch_hyperactor.alloc"
188)]
189pub struct PyAllocSpec {
190 inner: AllocSpec,
191 transport: Option<ChannelTransport>,
198}
199
200#[pymethods]
201impl PyAllocSpec {
202 #[new]
203 #[pyo3(signature = (constraints, **kwargs))]
204 fn new(constraints: &PyAllocConstraints, kwargs: Option<&Bound<'_, PyDict>>) -> PyResult<Self> {
205 let mut keys = Vec::new();
206 let mut values = Vec::new();
207
208 if let Some(kwargs) = kwargs {
209 for (key, value) in kwargs {
210 keys.push(key.clone());
211 values.push(value.clone());
212 }
213 };
214
215 let extent = Extent::new(
216 keys.into_iter()
217 .map(|key| key.extract::<String>())
218 .collect::<PyResult<Vec<String>>>()?,
219 values
220 .into_iter()
221 .map(|key| key.extract::<usize>())
222 .collect::<PyResult<Vec<usize>>>()?,
223 )
224 .map_err(|e| PyValueError::new_err(format!("Invalid extent: {:?}", e)))?;
225
226 Ok(Self {
227 inner: AllocSpec {
228 extent,
229 constraints: constraints.inner.clone(),
230 proc_name: None,
231 transport: default_transport(),
232 proc_allocation_mode: Default::default(),
233 },
234 transport: None,
235 })
236 }
237
238 #[getter]
239 fn extent<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
240 let d = PyDict::new(py);
241 for (name, size) in self.inner.extent.iter() {
242 d.set_item(name, size)?;
243 }
244 Ok(d)
245 }
246
247 #[getter]
248 fn constraints(&self) -> PyResult<PyAllocConstraints> {
249 Ok(PyAllocConstraints {
250 inner: self.inner.constraints.clone(),
251 })
252 }
253}
254
255impl From<&PyAllocSpec> for AllocSpec {
256 fn from(spec: &PyAllocSpec) -> Self {
257 let mut inner = spec.inner.clone();
258 inner.transport = spec.transport.clone().unwrap_or_else(default_transport);
259 inner
260 }
261}
262
263#[pyclass(
264 name = "LocalAllocatorBase",
265 module = "monarch._rust_bindings.monarch_hyperactor.alloc",
266 subclass
267)]
268pub struct PyLocalAllocator;
269
270#[pymethods]
271impl PyLocalAllocator {
272 #[new]
273 fn new() -> Self {
274 PyLocalAllocator {}
275 }
276
277 fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
278 let spec = spec.into();
282 PyPythonTask::new(async move {
283 LocalAllocator
284 .allocate(spec)
285 .await
286 .map(|inner| PyAlloc::new(Box::new(inner), None))
287 .map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
288 })
289 }
290}
291
292#[pyclass(
293 name = "ProcessAllocatorBase",
294 module = "monarch._rust_bindings.monarch_hyperactor.alloc",
295 subclass
296)]
297pub struct PyProcessAllocator {
298 inner: Arc<tokio::sync::Mutex<ProcessAllocator>>,
299}
300
301#[pymethods]
302impl PyProcessAllocator {
303 #[new]
304 #[pyo3(signature = (cmd, args=None, env=None))]
305 fn new(cmd: String, args: Option<Vec<String>>, env: Option<HashMap<String, String>>) -> Self {
306 let mut cmd = Command::new(cmd);
307 if let Some(args) = args {
308 cmd.args(args);
309 }
310 if let Some(env) = env {
311 cmd.envs(env);
312 }
313 Self {
314 inner: Arc::new(tokio::sync::Mutex::new(ProcessAllocator::new(cmd))),
315 }
316 }
317
318 fn allocate_nonblocking(&self, py: Python<'_>, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
319 let instance = Arc::clone(&self.inner);
323 let spec = spec.into();
324 let bootstrap_command = PyBootstrapCommand::default(py)?.borrow().to_rust();
325 PyPythonTask::new(async move {
326 instance
327 .lock()
328 .await
329 .allocate(spec)
330 .await
331 .map(|inner| PyAlloc::new(Box::new(inner), Some(bootstrap_command)))
332 .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
333 })
334 }
335}
336
337pub struct PyRemoteProcessAllocInitializer {
342 py_inner: Py<PyAny>,
344
345 constraints: AllocConstraints,
347}
348
349impl PyRemoteProcessAllocInitializer {
350 async fn py_initialize_alloc(&self) -> PyResult<Vec<String>> {
356 let args = (&self.constraints.match_labels,);
357 let coro = monarch_with_gil(|py| -> PyResult<Py<PyAny>> {
358 self.py_inner
359 .bind(py)
360 .call_method1("initialize_alloc", args)
361 .map(|x| x.unbind())
362 })
363 .await?;
364 let r = get_tokio_runtime().spawn_blocking(move || -> PyResult<Vec<String>> {
365 monarch_with_gil_blocking(|py| {
367 let asyncio = py.import("asyncio").unwrap();
368 let addrs = asyncio.call_method1("run", (coro,))?;
369 let addrs: PyResult<Vec<String>> = addrs.extract();
370 addrs
371 })
372 });
373
374 r.await
375 .map_err(|err| PyRuntimeError::new_err(err.to_string()))?
376 }
377
378 async fn get_transport_and_port(&self) -> PyResult<(ChannelTransport, u16)> {
379 let addrs = self.py_initialize_alloc().await?;
385 let addr = addrs
386 .first()
387 .ok_or_else(|| anyhow!("initializer must return non-empty list of addresses"))?;
388 let channel_addr = PyChannelAddr::parse(addr)?;
389 let port = channel_addr.get_port()?;
390 let transport = channel_addr.get_transport()?;
391 Ok((transport.into(), port))
392 }
393}
394
395#[async_trait]
396impl RemoteProcessAllocInitializer for PyRemoteProcessAllocInitializer {
397 async fn initialize_alloc(&mut self) -> Result<Vec<RemoteProcessAllocHost>, anyhow::Error> {
398 let addrs = self.py_initialize_alloc().await?;
400 addrs
401 .iter()
402 .map(|channel_addr| {
403 let addr = ChannelAddr::from_str(channel_addr)?;
404 let (id, hostname) = match addr {
405 ChannelAddr::Tcp(socket) => {
406 if socket.is_ipv6() {
407 let ipv6_addr = format!("[{}]", socket.ip());
410 (ipv6_addr.clone(), ipv6_addr.clone())
411 } else {
412 let ipv4_addr = socket.ip().to_string();
413 (ipv4_addr.clone(), ipv4_addr.clone())
414 }
415 }
416 ChannelAddr::MetaTls(TlsAddr { hostname, .. }) => {
417 (hostname.clone(), hostname.clone())
418 }
419 ChannelAddr::Unix(_) => (addr.to_string(), addr.to_string()),
420 _ => anyhow::bail!("unsupported transport for channel address: `{addr}`"),
421 };
422 Ok(RemoteProcessAllocHost { id, hostname })
423 })
424 .collect()
425 }
426}
427
428#[pyclass(
429 name = "RemoteAllocatorBase",
430 module = "monarch._rust_bindings.monarch_hyperactor.alloc",
431 subclass
432)]
433pub struct PyRemoteAllocator {
434 alloc_name: String,
435 initializer: Py<PyAny>,
436}
437
438impl Clone for PyRemoteAllocator {
439 fn clone(&self) -> Self {
440 Self {
441 alloc_name: self.alloc_name.clone(),
442 initializer: monarch_with_gil_blocking(|py| Py::clone_ref(&self.initializer, py)),
443 }
444 }
445}
446#[async_trait]
447impl Allocator for PyRemoteAllocator {
448 type Alloc = RemoteProcessAlloc;
449
450 async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError> {
451 let py_inner = monarch_with_gil(|py| Py::clone_ref(&self.initializer, py)).await;
452 let constraints = spec.constraints.clone();
453 let initializer = PyRemoteProcessAllocInitializer {
454 py_inner,
455 constraints,
456 };
457
458 let (transport, port) = initializer
459 .get_transport_and_port()
460 .await
461 .map_err(|e| AllocatorError::Other(e.into()))?;
462
463 let mut spec = spec;
464 if spec.transport != transport {
465 monarch_with_gil(|py| -> PyResult<()> {
466 py.import("warnings")?.getattr("warn")?.call1((format!(
469 "The AllocSpec passed to RemoteAllocator.allocate has transport {}, \
470 but the transport from the remote process alloc initializer is {}. \
471 This will soon be an error unless you explicitly configure monarch's \
472 default transport to {}. The current default transport is {}.",
473 spec.transport,
474 transport,
475 transport,
476 default_transport()
477 ),))?;
478 spec.transport = transport;
479 Ok(())
480 })
481 .await
482 .map_err(|e| AllocatorError::Other(e.into()))?;
483 }
484
485 let alloc =
486 RemoteProcessAlloc::new(spec, AllocName(self.alloc_name.clone()), port, initializer)
487 .await
488 .map_err(|e| {
489 tracing::error!("failed to allocate: {e:?}");
490 e
491 })?;
492 Ok(alloc)
493 }
494}
495
496#[pymethods]
497impl PyRemoteAllocator {
498 #[new]
499 #[pyo3(signature = (
500 alloc_name,
501 initializer,
502 ))]
503 fn new(alloc_name: String, initializer: Py<PyAny>) -> PyResult<Self> {
504 Ok(Self {
505 alloc_name,
506 initializer,
507 })
508 }
509
510 fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
511 let spec = spec.into();
512 let mut cloned = self.clone();
513
514 PyPythonTask::new(async move {
515 cloned
516 .allocate(spec)
517 .await
518 .map(|alloc| PyAlloc::new(Box::new(alloc), None))
519 .map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
520 })
521 }
522}
523
524pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
525 hyperactor_mod.add_class::<PyAlloc>()?;
526 hyperactor_mod.add_class::<PyAllocConstraints>()?;
527 hyperactor_mod.add_class::<PyAllocSpec>()?;
528 hyperactor_mod.add_class::<PyProcessAllocator>()?;
529 hyperactor_mod.add_class::<PyLocalAllocator>()?;
530 hyperactor_mod.add_class::<PyRemoteAllocator>()?;
531
532 Ok(())
533}