1use std::hash::DefaultHasher;
10use std::hash::Hash;
11use std::hash::Hasher;
12use std::ops::Deref;
13use std::sync::Arc;
14
15use hyperactor::Mailbox;
16use hyperactor::OncePortHandle;
17use hyperactor::PortHandle;
18use hyperactor::accum::Accumulator;
19use hyperactor::accum::CommReducer;
20use hyperactor::accum::ReducerFactory;
21use hyperactor::accum::ReducerSpec;
22use hyperactor::mailbox::MailboxSender;
23use hyperactor::mailbox::MessageEnvelope;
24use hyperactor::mailbox::OncePortReceiver;
25use hyperactor::mailbox::PortReceiver;
26use hyperactor::mailbox::Undeliverable;
27use hyperactor::mailbox::monitored_return_handle;
28use hyperactor::message::Bind;
29use hyperactor::message::Bindings;
30use hyperactor::message::Unbind;
31use hyperactor::reference;
32use hyperactor_config::Flattrs;
33use monarch_types::PickledPyObject;
34use monarch_types::py_global;
35use pyo3::IntoPyObjectExt;
36use pyo3::exceptions::PyEOFError;
37use pyo3::exceptions::PyRuntimeError;
38use pyo3::exceptions::PyValueError;
39use pyo3::prelude::*;
40use pyo3::types::PyTuple;
41use pyo3::types::PyType;
42use serde::Deserialize;
43use serde::Serialize;
44use typeuri::Named;
45
46use crate::actor::PythonMessage;
47use crate::actor::PythonMessageKind;
48use crate::context::PyInstance;
49use crate::proc::PyActorId;
50use crate::pytokio::PyPythonTask;
51use crate::pytokio::PythonTask;
52use crate::runtime::monarch_with_gil;
53use crate::runtime::monarch_with_gil_blocking;
54
55#[derive(Clone, Debug)]
56#[pyclass(
57 name = "Mailbox",
58 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
59)]
60pub struct PyMailbox {
61 pub(super) inner: Mailbox,
62}
63
64impl PyMailbox {
65 pub fn get_inner(&self) -> &Mailbox {
66 &self.inner
67 }
68}
69
70#[pymethods]
71impl PyMailbox {
72 fn open_port<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
73 let (handle, receiver) = self.inner.open_port();
74 let handle = Py::new(py, PythonPortHandle { inner: handle })?;
75 let receiver = Py::new(
76 py,
77 PythonPortReceiver {
78 inner: Arc::new(tokio::sync::Mutex::new(receiver)),
79 },
80 )?;
81 PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
82 }
83
84 fn open_once_port<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
85 let (handle, receiver) = self.inner.open_once_port();
86 let handle = Py::new(
87 py,
88 PythonOncePortHandle {
89 inner: Some(handle),
90 },
91 )?;
92 let receiver = Py::new(
93 py,
94 PythonOncePortReceiver {
95 inner: Arc::new(std::sync::Mutex::new(Some(receiver))),
96 },
97 )?;
98 PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
99 }
100
101 fn open_accum_port<'py>(
102 &self,
103 py: Python<'py>,
104 accumulator: Py<PyAny>,
105 ) -> PyResult<Bound<'py, PyTuple>> {
106 let py_accumulator = PythonAccumulator::new(py, accumulator)?;
107 let (handle, receiver) = self.inner.open_accum_port(py_accumulator);
108 let handle = Py::new(py, PythonPortHandle { inner: handle })?;
109 let receiver = Py::new(
110 py,
111 PythonPortReceiver {
112 inner: Arc::new(tokio::sync::Mutex::new(receiver)),
113 },
114 )?;
115 PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
116 }
117
118 pub(super) fn post(&self, dest: &PyActorId, message: &PythonMessage) -> PyResult<()> {
119 let port_id = dest.inner.port_id(PythonMessage::port());
120 let message = wirevalue::Any::serialize(message).map_err(|err| {
121 PyRuntimeError::new_err(format!(
122 "failed to serialize message ({:?}) to Any: {}",
123 message, err
124 ))
125 })?;
126 let envelope = MessageEnvelope::new(
127 self.inner.actor_id().clone(),
128 port_id,
129 message,
130 Flattrs::new(),
131 );
132 let return_handle = self
133 .inner
134 .bound_return_handle()
135 .unwrap_or(monitored_return_handle());
136 self.inner.post(envelope, return_handle);
137 Ok(())
138 }
139
140 #[getter]
141 pub(super) fn actor_id(&self) -> PyActorId {
142 PyActorId {
143 inner: self.inner.actor_id().clone(),
144 }
145 }
146
147 fn __repr__(&self) -> String {
148 format!("{:?}", self.inner)
149 }
150}
151
152#[pyclass(
153 frozen,
154 name = "PortId",
155 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
156)]
157#[derive(Clone)]
158pub struct PyPortId {
159 inner: reference::PortId,
160}
161
162impl From<reference::PortId> for PyPortId {
163 fn from(port_id: reference::PortId) -> Self {
164 Self { inner: port_id }
165 }
166}
167
168impl From<PyPortId> for reference::PortId {
169 fn from(port_id: PyPortId) -> Self {
170 port_id.inner
171 }
172}
173
174impl From<Mailbox> for PyMailbox {
175 fn from(inner: Mailbox) -> Self {
176 PyMailbox { inner }
177 }
178}
179
180#[pymethods]
181impl PyPortId {
182 #[new]
183 #[pyo3(signature = (*, actor_id, port))]
184 fn new(actor_id: &PyActorId, port: u64) -> Self {
185 Self {
186 inner: reference::PortId::new(actor_id.inner.clone(), port),
187 }
188 }
189
190 #[staticmethod]
191 fn from_string(port_id: &str) -> PyResult<Self> {
192 Ok(Self {
193 inner: port_id.parse().map_err(|e| {
194 PyValueError::new_err(format!("Failed to parse port id '{}': {}", port_id, e))
195 })?,
196 })
197 }
198
199 #[getter]
200 fn actor_id(&self) -> PyActorId {
201 PyActorId {
202 inner: self.inner.actor_id().clone(),
203 }
204 }
205
206 #[getter]
207 fn index(&self) -> u64 {
208 self.inner.index()
209 }
210
211 fn __repr__(&self) -> String {
212 self.inner.to_string()
213 }
214
215 fn __hash__(&self) -> u64 {
216 let mut hasher = DefaultHasher::new();
217 self.inner.to_string().hash(&mut hasher);
218 hasher.finish()
219 }
220
221 fn __eq__(&self, other: &Bound<'_, PyAny>) -> PyResult<bool> {
222 if let Ok(other) = other.extract::<PyPortId>() {
223 Ok(self.inner == other.inner)
224 } else {
225 Ok(false)
226 }
227 }
228
229 fn __reduce__<'py>(slf: &Bound<'py, Self>) -> PyResult<(Bound<'py, PyAny>, (String,))> {
230 Ok((slf.getattr("from_string")?, (slf.borrow().__repr__(),)))
231 }
232}
233
234impl std::fmt::Debug for PyPortId {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 self.inner.fmt(f)
237 }
238}
239
240#[derive(Clone, Debug)]
241#[pyclass(
242 name = "PortHandle",
243 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
244)]
245pub(crate) struct PythonPortHandle {
246 inner: PortHandle<PythonMessage>,
247}
248
249impl PythonPortHandle {
250 pub(crate) fn new(inner: PortHandle<PythonMessage>) -> Self {
251 Self { inner }
252 }
253}
254
255#[pymethods]
256impl PythonPortHandle {
257 fn send(&self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
258 self.inner
259 .send(instance.deref(), message)
260 .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
261 Ok(())
262 }
263
264 fn bind(&self) -> PythonPortRef {
265 PythonPortRef {
266 inner: self.inner.bind(),
267 }
268 }
269}
270
271#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
272#[pyclass(
273 name = "PortRef",
274 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
275)]
276pub struct PythonPortRef {
277 pub(crate) inner: reference::PortRef<PythonMessage>,
278}
279
280#[pymethods]
281impl PythonPortRef {
282 #[new]
283 fn new(port: PyPortId) -> Self {
284 Self {
285 inner: reference::PortRef::attest(port.into()),
286 }
287 }
288 fn __reduce__<'py>(
289 slf: Bound<'py, PythonPortRef>,
290 ) -> PyResult<(Bound<'py, PyType>, (PyPortId,))> {
291 let id: PyPortId = (*slf.borrow()).inner.port_id().clone().into();
292 Ok((slf.get_type(), (id,)))
293 }
294
295 fn send(&self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
296 self.inner
297 .send(instance.deref(), message)
298 .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
299 Ok(())
300 }
301
302 fn __repr__(&self) -> String {
303 self.inner.to_string()
304 }
305
306 #[getter]
307 fn port_id(&self) -> PyResult<PyPortId> {
308 Ok(self.inner.port_id().clone().into())
309 }
310
311 #[getter]
312 fn get_return_undeliverable(&self) -> bool {
313 self.inner.get_return_undeliverable()
314 }
315
316 #[setter]
317 fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
318 self.inner.return_undeliverable(return_undeliverable);
319 }
320}
321
322impl From<reference::PortRef<PythonMessage>> for PythonPortRef {
323 fn from(port_ref: reference::PortRef<PythonMessage>) -> Self {
324 Self { inner: port_ref }
325 }
326}
327
328#[derive(Debug)]
329#[pyclass(
330 name = "PortReceiver",
331 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
332)]
333pub(super) struct PythonPortReceiver {
334 inner: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
335}
336
337async fn recv_async(
338 receiver: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
339) -> PyResult<Py<PyAny>> {
340 let message = receiver
341 .lock()
342 .await
343 .recv()
344 .await
345 .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
346
347 monarch_with_gil(|py| message.into_py_any(py)).await
348}
349
350#[pymethods]
351impl PythonPortReceiver {
352 fn recv_task(&mut self) -> PyResult<PyPythonTask> {
353 let receiver = self.inner.clone();
354 Ok(PythonTask::new(recv_async(receiver))?.into())
355 }
356}
357
358impl PythonPortReceiver {
359 #[allow(dead_code)]
360 pub(super) fn inner(&self) -> Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>> {
361 Arc::clone(&self.inner)
362 }
363}
364
365#[derive(Debug)]
366#[pyclass(
367 name = "UndeliverableMessageEnvelope",
368 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
369)]
370pub(crate) struct PythonUndeliverableMessageEnvelope {
371 pub(crate) inner: Option<Undeliverable<MessageEnvelope>>,
372}
373
374impl PythonUndeliverableMessageEnvelope {
375 fn inner(&self) -> PyResult<&Undeliverable<MessageEnvelope>> {
376 self.inner.as_ref().ok_or_else(|| {
377 PyErr::new::<PyRuntimeError, _>(
378 "PythonUndeliverableMessageEnvelope was already consumed",
379 )
380 })
381 }
382
383 pub(crate) fn take(&mut self) -> anyhow::Result<Undeliverable<MessageEnvelope>> {
384 self.inner.take().ok_or_else(|| {
385 anyhow::anyhow!("PythonUndeliverableMessageEnvelope was already consumed")
386 })
387 }
388}
389
390#[pymethods]
391impl PythonUndeliverableMessageEnvelope {
392 fn __repr__(&self) -> PyResult<String> {
393 Ok(format!(
394 "UndeliverableMessageEnvelope(sender={}, dest={}, error={})",
395 self.inner()?.0.sender(),
396 self.inner()?.0.dest(),
397 self.error_msg()?
398 ))
399 }
400
401 fn sender(&self) -> PyResult<PyActorId> {
402 Ok(PyActorId {
403 inner: self.inner()?.0.sender().clone(),
404 })
405 }
406
407 fn dest(&self) -> PyResult<PyPortId> {
408 Ok(self.inner()?.0.dest().clone().into())
409 }
410
411 fn error_msg(&self) -> PyResult<String> {
412 Ok(self
413 .inner()?
414 .0
415 .error_msg()
416 .unwrap_or_else(|| "None".to_string()))
417 }
418}
419
420#[derive(Debug)]
421#[pyclass(
422 name = "OncePortHandle",
423 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
424)]
425pub(super) struct PythonOncePortHandle {
426 inner: Option<OncePortHandle<PythonMessage>>,
427}
428
429#[pymethods]
430impl PythonOncePortHandle {
431 fn send(&mut self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
432 let Some(port) = self.inner.take() else {
433 return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
434 };
435 port.send(instance.deref(), message)
436 .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
437 Ok(())
438 }
439
440 fn bind(&mut self) -> PyResult<PythonOncePortRef> {
441 let Some(port) = self.inner.take() else {
442 return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
443 };
444 Ok(PythonOncePortRef {
445 inner: Some(port.bind()),
446 })
447 }
448}
449
450#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
451#[pyclass(
452 name = "OncePortRef",
453 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
454)]
455pub struct PythonOncePortRef {
456 pub(crate) inner: Option<reference::OncePortRef<PythonMessage>>,
457}
458
459#[pymethods]
460impl PythonOncePortRef {
461 #[new]
462 fn new(port: Option<PyPortId>) -> Self {
463 Self {
464 inner: port.map(|port| reference::PortRef::attest(port.inner).into_once()),
465 }
466 }
467 fn __reduce__<'py>(
468 slf: Bound<'py, PythonOncePortRef>,
469 ) -> PyResult<(Bound<'py, PyType>, (Option<PyPortId>,))> {
470 let id: Option<PyPortId> = (*slf.borrow())
471 .inner
472 .as_ref()
473 .map(|x| x.port_id().clone().into());
474 Ok((slf.get_type(), (id,)))
475 }
476
477 fn send(&mut self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
478 let Some(port_ref) = self.inner.take() else {
479 return Err(PyErr::new::<PyValueError, _>("OncePortRef is already used"));
480 };
481 port_ref
482 .send(instance.deref(), message)
483 .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
484 Ok(())
485 }
486
487 fn __repr__(&self) -> String {
488 self.inner
489 .as_ref()
490 .map_or("OncePortRef is already used".to_string(), |r| r.to_string())
491 }
492
493 #[getter]
494 fn port_id(&self) -> PyResult<PyPortId> {
495 Ok(self.inner.as_ref().unwrap().port_id().clone().into())
496 }
497
498 #[getter]
499 fn get_return_undeliverable(&self) -> bool {
500 self.inner.as_ref().unwrap().get_return_undeliverable()
501 }
502
503 #[setter]
504 fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
505 if let Some(ref mut inner) = self.inner {
506 inner.return_undeliverable(return_undeliverable);
507 }
508 }
509}
510
511impl From<reference::OncePortRef<PythonMessage>> for PythonOncePortRef {
512 fn from(port_ref: reference::OncePortRef<PythonMessage>) -> Self {
513 Self {
514 inner: Some(port_ref),
515 }
516 }
517}
518
519#[pyclass(
520 name = "OncePortReceiver",
521 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
522)]
523pub(super) struct PythonOncePortReceiver {
524 inner: Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>>,
525}
526
527#[pymethods]
528impl PythonOncePortReceiver {
529 fn recv_task(&mut self) -> PyResult<PyPythonTask> {
530 let Some(receiver) = self.inner.lock().unwrap().take() else {
531 return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
532 };
533 let fut = async move {
534 let message = receiver
535 .recv()
536 .await
537 .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
538
539 monarch_with_gil(|py| message.into_py_any(py)).await
540 };
541 Ok(PythonTask::new(fut)?.into())
542 }
543}
544
545impl PythonOncePortReceiver {
546 #[allow(dead_code)]
547 pub(super) fn inner(&self) -> Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>> {
548 Arc::clone(&self.inner)
549 }
550}
551
552#[derive(
553 Clone,
554 Serialize,
555 Deserialize,
556 Named,
557 PartialEq,
558 FromPyObject,
559 IntoPyObject,
560 Debug
561)]
562pub enum EitherPortRef {
563 Unbounded(PythonPortRef),
564 Once(PythonOncePortRef),
565}
566
567impl Unbind for EitherPortRef {
568 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
569 match self {
570 EitherPortRef::Unbounded(port_ref) => port_ref.inner.unbind(bindings),
571 EitherPortRef::Once(once_port_ref) => once_port_ref.inner.unbind(bindings),
572 }
573 }
574}
575
576impl Bind for EitherPortRef {
577 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
578 match self {
579 EitherPortRef::Unbounded(port_ref) => port_ref.inner.bind(bindings),
580 EitherPortRef::Once(once_port_ref) => once_port_ref.inner.bind(bindings),
581 }
582 }
583}
584
585impl EitherPortRef {
586 pub fn get_return_undeliverable(&self) -> bool {
587 match self {
588 EitherPortRef::Unbounded(port_ref) => port_ref.inner.get_return_undeliverable(),
589 EitherPortRef::Once(once_port_ref) => once_port_ref
590 .inner
591 .as_ref()
592 .is_some_and(|r| r.get_return_undeliverable()),
593 }
594 }
595
596 pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
597 match self {
598 EitherPortRef::Unbounded(port_ref) => {
599 port_ref.inner.return_undeliverable(return_undeliverable);
600 }
601 EitherPortRef::Once(once_port_ref) => {
602 if let Some(ref mut inner) = once_port_ref.inner {
603 inner.return_undeliverable(return_undeliverable);
604 }
605 }
606 }
607 }
608
609 pub fn send(
612 &mut self,
613 cx: &impl hyperactor::context::Actor,
614 message: crate::actor::PythonMessage,
615 ) -> anyhow::Result<()> {
616 match self {
617 EitherPortRef::Unbounded(port_ref) => port_ref.inner.send(cx, message)?,
618 EitherPortRef::Once(once_port_ref) => {
619 let port = once_port_ref
620 .inner
621 .take()
622 .ok_or_else(|| anyhow::anyhow!("OncePortRef already used"))?;
623 port.send(cx, message)?;
624 }
625 }
626 Ok(())
627 }
628}
629
630#[derive(Debug, Named)]
631struct PythonReducer(Py<PyAny>);
632
633impl PythonReducer {
634 fn new(params: Option<wirevalue::Any>) -> anyhow::Result<Self> {
635 let p = params.ok_or_else(|| anyhow::anyhow!("params cannot be None"))?;
636 let obj: PickledPyObject = p.deserialized()?;
637 Ok(monarch_with_gil_blocking(
638 |py: Python<'_>| -> PyResult<Self> {
639 let unpickled = obj.unpickle(py)?;
640 Ok(Self(unpickled.unbind()))
641 },
642 )?)
643 }
644}
645
646impl CommReducer for PythonReducer {
647 type Update = PythonMessage;
648
649 fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
650 monarch_with_gil_blocking(|py: Python<'_>| -> PyResult<PythonMessage> {
651 let result = self.0.call(py, (left, right), None)?;
652 result.extract::<PythonMessage>(py)
653 })
654 .map_err(Into::into)
655 }
656}
657
658struct PythonAccumulator {
659 accumulator: Py<PyAny>,
660 reducer: Option<wirevalue::Any>,
661}
662
663impl PythonAccumulator {
664 fn new<'py>(py: Python<'py>, accumulator: Py<PyAny>) -> PyResult<Self> {
665 let py_reducer = accumulator.getattr(py, "reducer")?;
666 let reducer: Option<wirevalue::Any> = if py_reducer.is_none(py) {
667 None
668 } else {
669 let pickled = PickledPyObject::cloudpickle(py_reducer.bind(py))?;
670 Some(
671 wirevalue::Any::serialize(&pickled)
672 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?,
673 )
674 };
675
676 Ok(Self {
677 accumulator,
678 reducer,
679 })
680 }
681}
682
683impl Accumulator for PythonAccumulator {
684 type State = PythonMessage;
685 type Update = PythonMessage;
686
687 fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
688 monarch_with_gil_blocking(|py: Python<'_>| -> PyResult<()> {
689 if matches!(state.kind, PythonMessageKind::Uninit {}) {
691 *state = self
692 .accumulator
693 .getattr(py, "initial_state")?
694 .extract::<PythonMessage>(py)?;
695 }
696
697 let old_state = state.clone();
700 let result = self.accumulator.call(py, (old_state, update), None)?;
701 *state = result.extract::<PythonMessage>(py)?;
702 Ok(())
703 })
704 .map_err(Into::into)
705 }
706
707 fn reducer_spec(&self) -> Option<ReducerSpec> {
708 self.reducer.as_ref().map(|r| ReducerSpec {
709 typehash: <PythonReducer as Named>::typehash(),
710 builder_params: Some(r.clone()),
711 })
712 }
713}
714
715inventory::submit! {
716 ReducerFactory {
717 typehash_f: <PythonReducer as Named>::typehash,
718 builder_f: |params| Ok(Box::new(PythonReducer::new(params)?)),
719 }
720}
721
722py_global!(point, "monarch._src.actor.actor_mesh", "Point");
723
724pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
725 hyperactor_mod.add_class::<PyMailbox>()?;
726 hyperactor_mod.add_class::<PyPortId>()?;
727 hyperactor_mod.add_class::<PythonPortHandle>()?;
728 hyperactor_mod.add_class::<PythonPortRef>()?;
729 hyperactor_mod.add_class::<PythonPortReceiver>()?;
730 hyperactor_mod.add_class::<PythonOncePortHandle>()?;
731 hyperactor_mod.add_class::<PythonOncePortRef>()?;
732 hyperactor_mod.add_class::<PythonOncePortReceiver>()?;
733 hyperactor_mod.add_class::<PythonUndeliverableMessageEnvelope>()?;
734 Ok(())
735}