1use std::hash::DefaultHasher;
10use std::hash::Hash;
11use std::hash::Hasher;
12use std::ops::Deref;
13use std::sync::Arc;
14
15use hyperactor::Endpoint as _;
16use hyperactor::Mailbox;
17use hyperactor::OncePortHandle;
18use hyperactor::PortHandle;
19use hyperactor::RemoteEndpoint as _;
20use hyperactor::accum::Accumulator;
21use hyperactor::accum::CommReducer;
22use hyperactor::accum::ReducerFactory;
23use hyperactor::accum::ReducerSpec;
24use hyperactor::mailbox::MailboxSender;
25use hyperactor::mailbox::MessageEnvelope;
26use hyperactor::mailbox::OncePortReceiver;
27use hyperactor::mailbox::PortReceiver;
28use hyperactor::mailbox::Undeliverable;
29use hyperactor::mailbox::monitored_return_handle;
30use hyperactor::message::Bind;
31use hyperactor::message::Bindings;
32use hyperactor::message::Unbind;
33use hyperactor_config::Flattrs;
34use monarch_types::PickledPyObject;
35use monarch_types::py_global;
36use pyo3::IntoPyObjectExt;
37use pyo3::exceptions::PyEOFError;
38use pyo3::exceptions::PyRuntimeError;
39use pyo3::exceptions::PyValueError;
40use pyo3::prelude::*;
41use pyo3::types::PyTuple;
42use pyo3::types::PyType;
43use serde::Deserialize;
44use serde::Serialize;
45use typeuri::Named;
46
47use crate::actor::PythonMessage;
48use crate::actor::PythonMessageKind;
49use crate::context::PyInstance;
50use crate::proc::PyActorAddr;
51use crate::pytokio::PyPythonTask;
52use crate::pytokio::PythonTask;
53use crate::runtime::monarch_with_gil;
54use crate::runtime::monarch_with_gil_blocking;
55
56#[derive(Clone, Debug)]
57#[pyclass(
58 name = "Mailbox",
59 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
60)]
61pub struct PyMailbox {
62 pub(super) inner: Mailbox,
63}
64
65impl PyMailbox {
66 pub fn get_inner(&self) -> &Mailbox {
67 &self.inner
68 }
69}
70
71#[pymethods]
72impl PyMailbox {
73 fn open_port<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
74 let (handle, receiver) = self.inner.open_port();
75 let handle = Py::new(py, PythonPortHandle { inner: handle })?;
76 let receiver = Py::new(
77 py,
78 PythonPortReceiver {
79 inner: Arc::new(tokio::sync::Mutex::new(receiver)),
80 },
81 )?;
82 PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
83 }
84
85 fn open_once_port<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
86 let (handle, receiver) = self.inner.open_once_port();
87 let handle = Py::new(
88 py,
89 PythonOncePortHandle {
90 inner: Some(handle),
91 },
92 )?;
93 let receiver = Py::new(
94 py,
95 PythonOncePortReceiver {
96 inner: Arc::new(std::sync::Mutex::new(Some(receiver))),
97 },
98 )?;
99 PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
100 }
101
102 fn open_accum_port<'py>(
103 &self,
104 py: Python<'py>,
105 accumulator: Py<PyAny>,
106 ) -> PyResult<Bound<'py, PyTuple>> {
107 let py_accumulator = PythonAccumulator::new(py, accumulator)?;
108 let (handle, receiver) = self.inner.open_accum_port(py_accumulator);
109 let handle = Py::new(py, PythonPortHandle { inner: handle })?;
110 let receiver = Py::new(
111 py,
112 PythonPortReceiver {
113 inner: Arc::new(tokio::sync::Mutex::new(receiver)),
114 },
115 )?;
116 PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
117 }
118
119 pub(super) fn post(&self, dest: &PyActorAddr, message: &PythonMessage) -> PyResult<()> {
120 let port_id = dest.inner.port_addr(PythonMessage::port().into());
121 let message = wirevalue::Any::serialize(message).map_err(|err| {
122 PyRuntimeError::new_err(format!(
123 "failed to serialize message ({:?}) to Any: {}",
124 message, err
125 ))
126 })?;
127 let envelope = MessageEnvelope::new(
128 self.inner.actor_addr().clone(),
129 port_id,
130 message,
131 Flattrs::new(),
132 );
133 let return_handle = self
134 .inner
135 .bound_return_handle()
136 .unwrap_or(monitored_return_handle());
137 self.inner.post(envelope, return_handle);
138 Ok(())
139 }
140
141 #[getter]
142 pub(super) fn actor_id(&self) -> PyActorAddr {
143 PyActorAddr {
144 inner: self.inner.actor_addr().clone(),
145 }
146 }
147
148 fn __repr__(&self) -> String {
149 format!("{:?}", self.inner)
150 }
151}
152
153#[pyclass(
154 frozen,
155 name = "PortId",
156 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
157)]
158#[derive(Clone)]
159pub struct PyPortId {
160 inner: hyperactor::PortAddr,
161}
162
163impl From<hyperactor::PortAddr> for PyPortId {
164 fn from(port_id: hyperactor::PortAddr) -> Self {
165 Self { inner: port_id }
166 }
167}
168
169impl From<PyPortId> for hyperactor::PortAddr {
170 fn from(port_id: PyPortId) -> Self {
171 port_id.inner
172 }
173}
174
175impl From<Mailbox> for PyMailbox {
176 fn from(inner: Mailbox) -> Self {
177 PyMailbox { inner }
178 }
179}
180
181#[pymethods]
182impl PyPortId {
183 #[new]
184 #[pyo3(signature = (*, actor_id, port))]
185 fn new(actor_id: &PyActorAddr, port: u64) -> Self {
186 Self {
187 inner: actor_id.inner.port_addr(port.into()),
188 }
189 }
190
191 #[staticmethod]
192 fn from_string(port_id: &str) -> PyResult<Self> {
193 Ok(Self {
194 inner: port_id.parse().map_err(|e| {
195 PyValueError::new_err(format!("Failed to parse port id '{}': {}", port_id, e))
196 })?,
197 })
198 }
199
200 #[getter]
201 fn actor_id(&self) -> PyActorAddr {
202 PyActorAddr {
203 inner: self.inner.actor_addr(),
204 }
205 }
206
207 #[getter]
208 fn index(&self) -> u64 {
209 self.inner.index()
210 }
211
212 fn __repr__(&self) -> String {
213 self.inner.to_string()
214 }
215
216 fn __hash__(&self) -> u64 {
217 let mut hasher = DefaultHasher::new();
218 self.inner.to_string().hash(&mut hasher);
219 hasher.finish()
220 }
221
222 fn __eq__(&self, other: &Bound<'_, PyAny>) -> PyResult<bool> {
223 if let Ok(other) = other.extract::<PyPortId>() {
224 Ok(self.inner == other.inner)
225 } else {
226 Ok(false)
227 }
228 }
229
230 fn __reduce__<'py>(slf: &Bound<'py, Self>) -> PyResult<(Bound<'py, PyAny>, (String,))> {
231 Ok((slf.getattr("from_string")?, (slf.borrow().__repr__(),)))
232 }
233}
234
235impl std::fmt::Debug for PyPortId {
236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 self.inner.fmt(f)
238 }
239}
240
241#[derive(Clone, Debug)]
242#[pyclass(
243 name = "PortHandle",
244 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
245)]
246pub(crate) struct PythonPortHandle {
247 inner: PortHandle<PythonMessage>,
248}
249
250impl PythonPortHandle {
251 pub(crate) fn new(inner: PortHandle<PythonMessage>) -> Self {
252 Self { inner }
253 }
254}
255
256#[pymethods]
257impl PythonPortHandle {
258 fn send(&self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
259 self.inner.post(instance.deref(), message);
260 Ok(())
261 }
262
263 fn bind(&self) -> PythonPortRef {
264 PythonPortRef {
265 inner: self.inner.bind(),
266 }
267 }
268}
269
270#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
271#[pyclass(
272 name = "PortRef",
273 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
274)]
275pub struct PythonPortRef {
276 pub(crate) inner: hyperactor::PortRef<PythonMessage>,
277}
278
279#[pymethods]
280impl PythonPortRef {
281 #[new]
282 fn new(port: PyPortId) -> Self {
283 Self {
284 inner: hyperactor::PortRef::attest(port.inner),
285 }
286 }
287 fn __reduce__(slf: Bound<'_, PythonPortRef>) -> PyResult<(Bound<'_, PyType>, (PyPortId,))> {
288 let id: PyPortId = (*slf.borrow()).inner.port_addr().clone().into();
289 Ok((slf.get_type(), (id,)))
290 }
291
292 fn send(&self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
293 self.inner.post(instance.deref(), message);
294 Ok(())
295 }
296
297 fn __repr__(&self) -> String {
298 self.inner.to_string()
299 }
300
301 #[getter]
302 fn port_id(&self) -> PyResult<PyPortId> {
303 Ok(self.inner.port_addr().clone().into())
304 }
305
306 #[getter]
307 fn get_return_undeliverable(&self) -> bool {
308 self.inner.get_return_undeliverable()
309 }
310
311 #[setter]
312 fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
313 self.inner.return_undeliverable(return_undeliverable);
314 }
315}
316
317impl From<hyperactor::PortRef<PythonMessage>> for PythonPortRef {
318 fn from(port_ref: hyperactor::PortRef<PythonMessage>) -> Self {
319 Self { inner: port_ref }
320 }
321}
322
323#[derive(Debug)]
324#[pyclass(
325 name = "PortReceiver",
326 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
327)]
328pub(super) struct PythonPortReceiver {
329 inner: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
330}
331
332async fn recv_async(
333 receiver: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
334) -> PyResult<Py<PyAny>> {
335 let message = receiver
336 .lock()
337 .await
338 .recv()
339 .await
340 .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
341
342 monarch_with_gil(|py| message.into_py_any(py)).await
343}
344
345#[pymethods]
346impl PythonPortReceiver {
347 fn recv_task(&mut self) -> PyResult<PyPythonTask> {
348 let receiver = self.inner.clone();
349 Ok(PythonTask::new(recv_async(receiver))?.into())
350 }
351}
352
353impl PythonPortReceiver {
354 #[allow(dead_code)]
355 pub(super) fn inner(&self) -> Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>> {
356 Arc::clone(&self.inner)
357 }
358}
359
360#[derive(Debug)]
361#[pyclass(
362 name = "UndeliverableMessageEnvelope",
363 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
364)]
365pub(crate) struct PythonUndeliverableMessageEnvelope {
366 pub(crate) inner: Option<Undeliverable<MessageEnvelope>>,
367}
368
369impl PythonUndeliverableMessageEnvelope {
370 fn inner(&self) -> PyResult<&Undeliverable<MessageEnvelope>> {
371 self.inner.as_ref().ok_or_else(|| {
372 PyErr::new::<PyRuntimeError, _>(
373 "PythonUndeliverableMessageEnvelope was already consumed",
374 )
375 })
376 }
377
378 pub(crate) fn take(&mut self) -> anyhow::Result<Undeliverable<MessageEnvelope>> {
379 self.inner.take().ok_or_else(|| {
380 anyhow::anyhow!("PythonUndeliverableMessageEnvelope was already consumed")
381 })
382 }
383}
384
385#[pymethods]
386impl PythonUndeliverableMessageEnvelope {
387 fn __repr__(&self) -> PyResult<String> {
388 let inner = self.inner()?;
389 let Some(envelope) = inner.as_message() else {
390 return Ok("UndeliverableMessageEnvelope(lost)".to_string());
391 };
392 Ok(format!(
393 "UndeliverableMessageEnvelope(sender={}, dest={}, error={})",
394 envelope.sender(),
395 envelope.dest(),
396 self.error_msg()?
397 ))
398 }
399
400 fn sender(&self) -> PyResult<PyActorAddr> {
401 let envelope = self.inner()?.as_message().ok_or_else(|| {
402 PyErr::new::<PyRuntimeError, _>("lost undeliverable messages do not have an envelope")
403 })?;
404 Ok(PyActorAddr {
405 inner: envelope.sender().clone(),
406 })
407 }
408
409 fn dest(&self) -> PyResult<PyPortId> {
410 let envelope = self.inner()?.as_message().ok_or_else(|| {
411 PyErr::new::<PyRuntimeError, _>("lost undeliverable messages do not have an envelope")
412 })?;
413 let port_id: hyperactor::PortAddr = envelope.dest().clone();
414 Ok(port_id.into())
415 }
416
417 fn error_msg(&self) -> PyResult<String> {
418 match self.inner()? {
419 Undeliverable::Message(envelope) => {
420 Ok(envelope.error_msg().unwrap_or_else(|| "None".to_string()))
421 }
422 Undeliverable::Lost(lost) => Ok(lost.error.clone()),
423 }
424 }
425}
426
427#[derive(Debug)]
428#[pyclass(
429 name = "OncePortHandle",
430 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
431)]
432pub(super) struct PythonOncePortHandle {
433 inner: Option<OncePortHandle<PythonMessage>>,
434}
435
436#[pymethods]
437impl PythonOncePortHandle {
438 fn send(&mut self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
439 let Some(port) = self.inner.take() else {
440 return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
441 };
442 port.post(instance.deref(), message);
443 Ok(())
444 }
445
446 fn bind(&mut self) -> PyResult<PythonOncePortRef> {
447 let Some(port) = self.inner.take() else {
448 return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
449 };
450 Ok(PythonOncePortRef {
451 inner: Some(port.bind()),
452 })
453 }
454}
455
456#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
457#[pyclass(
458 name = "OncePortRef",
459 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
460)]
461pub struct PythonOncePortRef {
462 pub(crate) inner: Option<hyperactor::OncePortRef<PythonMessage>>,
463}
464
465#[pymethods]
466impl PythonOncePortRef {
467 #[new]
468 fn new(port: Option<PyPortId>) -> Self {
469 Self {
470 inner: port.map(|port| hyperactor::PortRef::attest(port.inner).into_once()),
471 }
472 }
473 fn __reduce__(
474 slf: Bound<'_, PythonOncePortRef>,
475 ) -> PyResult<(Bound<'_, PyType>, (Option<PyPortId>,))> {
476 let id: Option<PyPortId> = (*slf.borrow())
477 .inner
478 .as_ref()
479 .map(|x: &hyperactor::OncePortRef<PythonMessage>| x.port_addr().clone().into());
480 Ok((slf.get_type(), (id,)))
481 }
482
483 fn send(&mut self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
484 let Some(port_ref) = self.inner.take() else {
485 return Err(PyErr::new::<PyValueError, _>("OncePortRef is already used"));
486 };
487 let port_ref: hyperactor::OncePortRef<PythonMessage> = port_ref;
488 port_ref.post(instance.deref(), message);
489 Ok(())
490 }
491
492 fn __repr__(&self) -> String {
493 self.inner.as_ref().map_or(
494 "OncePortRef is already used".to_string(),
495 |r: &hyperactor::OncePortRef<PythonMessage>| r.to_string(),
496 )
497 }
498
499 #[getter]
500 fn port_id(&self) -> PyResult<PyPortId> {
501 Ok(self.inner.as_ref().unwrap().port_addr().clone().into())
502 }
503
504 #[getter]
505 fn get_return_undeliverable(&self) -> bool {
506 self.inner.as_ref().unwrap().get_return_undeliverable()
507 }
508
509 #[setter]
510 fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
511 if let Some(ref mut inner) = self.inner {
512 inner.return_undeliverable(return_undeliverable);
513 }
514 }
515}
516
517impl From<hyperactor::OncePortRef<PythonMessage>> for PythonOncePortRef {
518 fn from(port_ref: hyperactor::OncePortRef<PythonMessage>) -> Self {
519 Self {
520 inner: Some(port_ref),
521 }
522 }
523}
524
525#[pyclass(
526 name = "OncePortReceiver",
527 module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
528)]
529pub(super) struct PythonOncePortReceiver {
530 inner: Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>>,
531}
532
533#[pymethods]
534impl PythonOncePortReceiver {
535 fn recv_task(&mut self) -> PyResult<PyPythonTask> {
536 let Some(receiver) = self.inner.lock().unwrap().take() else {
537 return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
538 };
539 let fut = async move {
540 let message = receiver
541 .recv()
542 .await
543 .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
544
545 monarch_with_gil(|py| message.into_py_any(py)).await
546 };
547 Ok(PythonTask::new(fut)?.into())
548 }
549}
550
551impl PythonOncePortReceiver {
552 #[allow(dead_code)]
553 pub(super) fn inner(&self) -> Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>> {
554 Arc::clone(&self.inner)
555 }
556}
557
558#[derive(
559 Clone,
560 Serialize,
561 Deserialize,
562 Named,
563 PartialEq,
564 FromPyObject,
565 IntoPyObject,
566 Debug
567)]
568pub enum EitherPortRef {
569 Unbounded(PythonPortRef),
570 Once(PythonOncePortRef),
571}
572
573impl Unbind for EitherPortRef {
574 fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
575 match self {
576 EitherPortRef::Unbounded(port_ref) => port_ref.inner.unbind(bindings),
577 EitherPortRef::Once(once_port_ref) => once_port_ref.inner.unbind(bindings),
578 }
579 }
580}
581
582impl Bind for EitherPortRef {
583 fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
584 match self {
585 EitherPortRef::Unbounded(port_ref) => port_ref.inner.bind(bindings),
586 EitherPortRef::Once(once_port_ref) => once_port_ref.inner.bind(bindings),
587 }
588 }
589}
590
591impl EitherPortRef {
592 pub fn get_return_undeliverable(&self) -> bool {
593 match self {
594 EitherPortRef::Unbounded(port_ref) => port_ref.inner.get_return_undeliverable(),
595 EitherPortRef::Once(once_port_ref) => once_port_ref.inner.as_ref().is_some_and(
596 |r: &hyperactor::OncePortRef<PythonMessage>| r.get_return_undeliverable(),
597 ),
598 }
599 }
600
601 pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
602 match self {
603 EitherPortRef::Unbounded(port_ref) => {
604 port_ref.inner.return_undeliverable(return_undeliverable);
605 }
606 EitherPortRef::Once(once_port_ref) => {
607 if let Some(ref mut inner) = once_port_ref.inner {
608 inner.return_undeliverable(return_undeliverable);
609 }
610 }
611 }
612 }
613
614 pub fn post(
617 &mut self,
618 cx: &impl hyperactor::context::Actor,
619 message: crate::actor::PythonMessage,
620 ) -> anyhow::Result<()> {
621 match self {
622 EitherPortRef::Unbounded(port_ref) => port_ref.inner.post(cx, message),
623 EitherPortRef::Once(once_port_ref) => {
624 let port = once_port_ref
625 .inner
626 .take()
627 .ok_or_else(|| anyhow::anyhow!("OncePortRef already used"))?;
628 port.post(cx, message);
629 }
630 }
631 Ok(())
632 }
633
634 pub fn post_with_headers(
639 &mut self,
640 cx: &impl hyperactor::context::Actor,
641 headers: hyperactor_config::Flattrs,
642 message: crate::actor::PythonMessage,
643 ) -> anyhow::Result<()> {
644 match self {
645 EitherPortRef::Unbounded(port_ref) => {
646 port_ref.inner.post_with_headers(cx, headers, message)
647 }
648 EitherPortRef::Once(once_port_ref) => {
649 let port = once_port_ref
650 .inner
651 .take()
652 .ok_or_else(|| anyhow::anyhow!("OncePortRef already used"))?;
653 port.post_with_headers(cx, headers, message);
654 }
655 }
656 Ok(())
657 }
658}
659
660#[derive(Debug, Named)]
661struct PythonReducer(Py<PyAny>);
662
663impl PythonReducer {
664 fn new(params: Option<wirevalue::Any>) -> anyhow::Result<Self> {
665 let p = params.ok_or_else(|| anyhow::anyhow!("params cannot be None"))?;
666 let obj: PickledPyObject = p.deserialized()?;
667 Ok(monarch_with_gil_blocking(
668 |py: Python<'_>| -> PyResult<Self> {
669 let unpickled = obj.unpickle(py)?;
670 Ok(Self(unpickled.unbind()))
671 },
672 )?)
673 }
674}
675
676impl CommReducer for PythonReducer {
677 type Update = PythonMessage;
678
679 fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
680 monarch_with_gil_blocking(|py: Python<'_>| -> PyResult<PythonMessage> {
681 let result = self.0.call(py, (left, right), None)?;
682 result.extract::<PythonMessage>(py)
683 })
684 .map_err(Into::into)
685 }
686}
687
688struct PythonAccumulator {
689 accumulator: Py<PyAny>,
690 reducer: Option<wirevalue::Any>,
691}
692
693impl PythonAccumulator {
694 fn new(py: Python<'_>, accumulator: Py<PyAny>) -> PyResult<Self> {
695 let py_reducer = accumulator.getattr(py, "reducer")?;
696 let reducer: Option<wirevalue::Any> = if py_reducer.is_none(py) {
697 None
698 } else {
699 let pickled = PickledPyObject::cloudpickle(py_reducer.bind(py))?;
700 Some(
701 wirevalue::Any::serialize(&pickled)
702 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?,
703 )
704 };
705
706 Ok(Self {
707 accumulator,
708 reducer,
709 })
710 }
711}
712
713impl Accumulator for PythonAccumulator {
714 type State = PythonMessage;
715 type Update = PythonMessage;
716
717 fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
718 monarch_with_gil_blocking(|py: Python<'_>| -> PyResult<()> {
719 if matches!(state.kind, PythonMessageKind::Uninit {}) {
721 *state = self
722 .accumulator
723 .getattr(py, "initial_state")?
724 .extract::<PythonMessage>(py)?;
725 }
726
727 let old_state = state.clone();
730 let result = self.accumulator.call(py, (old_state, update), None)?;
731 *state = result.extract::<PythonMessage>(py)?;
732 Ok(())
733 })
734 .map_err(Into::into)
735 }
736
737 fn reducer_spec(&self) -> Option<ReducerSpec> {
738 self.reducer.as_ref().map(|r| ReducerSpec {
739 typehash: <PythonReducer as Named>::typehash(),
740 builder_params: Some(r.clone()),
741 })
742 }
743}
744
745inventory::submit! {
746 ReducerFactory {
747 typehash_f: <PythonReducer as Named>::typehash,
748 builder_f: |params| Ok(Box::new(PythonReducer::new(params)?)),
749 }
750}
751
752py_global!(point, "monarch._src.actor.actor_mesh", "Point");
753
754pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
755 hyperactor_mod.add_class::<PyMailbox>()?;
756 hyperactor_mod.add_class::<PyPortId>()?;
757 hyperactor_mod.add_class::<PythonPortHandle>()?;
758 hyperactor_mod.add_class::<PythonPortRef>()?;
759 hyperactor_mod.add_class::<PythonPortReceiver>()?;
760 hyperactor_mod.add_class::<PythonOncePortHandle>()?;
761 hyperactor_mod.add_class::<PythonOncePortRef>()?;
762 hyperactor_mod.add_class::<PythonOncePortReceiver>()?;
763 hyperactor_mod.add_class::<PythonUndeliverableMessageEnvelope>()?;
764 Ok(())
765}