Skip to main content

monarch_hyperactor/
mailbox.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::hash::DefaultHasher;
10use std::hash::Hash;
11use std::hash::Hasher;
12use std::ops::Deref;
13use std::sync::Arc;
14
15use hyperactor::Endpoint as _;
16use hyperactor::Mailbox;
17use hyperactor::OncePortHandle;
18use hyperactor::PortHandle;
19use hyperactor::RemoteEndpoint as _;
20use hyperactor::accum::Accumulator;
21use hyperactor::accum::CommReducer;
22use hyperactor::accum::ReducerFactory;
23use hyperactor::accum::ReducerSpec;
24use hyperactor::mailbox::MailboxSender;
25use hyperactor::mailbox::MessageEnvelope;
26use hyperactor::mailbox::OncePortReceiver;
27use hyperactor::mailbox::PortReceiver;
28use hyperactor::mailbox::Undeliverable;
29use hyperactor::mailbox::monitored_return_handle;
30use hyperactor::message::Bind;
31use hyperactor::message::Bindings;
32use hyperactor::message::Unbind;
33use hyperactor_config::Flattrs;
34use monarch_types::PickledPyObject;
35use monarch_types::py_global;
36use pyo3::IntoPyObjectExt;
37use pyo3::exceptions::PyEOFError;
38use pyo3::exceptions::PyRuntimeError;
39use pyo3::exceptions::PyValueError;
40use pyo3::prelude::*;
41use pyo3::types::PyTuple;
42use pyo3::types::PyType;
43use serde::Deserialize;
44use serde::Serialize;
45use typeuri::Named;
46
47use crate::actor::PythonMessage;
48use crate::actor::PythonMessageKind;
49use crate::context::PyInstance;
50use crate::proc::PyActorAddr;
51use crate::pytokio::PyPythonTask;
52use crate::pytokio::PythonTask;
53use crate::runtime::monarch_with_gil;
54use crate::runtime::monarch_with_gil_blocking;
55
56#[derive(Clone, Debug)]
57#[pyclass(
58    name = "Mailbox",
59    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
60)]
61pub struct PyMailbox {
62    pub(super) inner: Mailbox,
63}
64
65impl PyMailbox {
66    pub fn get_inner(&self) -> &Mailbox {
67        &self.inner
68    }
69}
70
71#[pymethods]
72impl PyMailbox {
73    fn open_port<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
74        let (handle, receiver) = self.inner.open_port();
75        let handle = Py::new(py, PythonPortHandle { inner: handle })?;
76        let receiver = Py::new(
77            py,
78            PythonPortReceiver {
79                inner: Arc::new(tokio::sync::Mutex::new(receiver)),
80            },
81        )?;
82        PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
83    }
84
85    fn open_once_port<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
86        let (handle, receiver) = self.inner.open_once_port();
87        let handle = Py::new(
88            py,
89            PythonOncePortHandle {
90                inner: Some(handle),
91            },
92        )?;
93        let receiver = Py::new(
94            py,
95            PythonOncePortReceiver {
96                inner: Arc::new(std::sync::Mutex::new(Some(receiver))),
97            },
98        )?;
99        PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
100    }
101
102    fn open_accum_port<'py>(
103        &self,
104        py: Python<'py>,
105        accumulator: Py<PyAny>,
106    ) -> PyResult<Bound<'py, PyTuple>> {
107        let py_accumulator = PythonAccumulator::new(py, accumulator)?;
108        let (handle, receiver) = self.inner.open_accum_port(py_accumulator);
109        let handle = Py::new(py, PythonPortHandle { inner: handle })?;
110        let receiver = Py::new(
111            py,
112            PythonPortReceiver {
113                inner: Arc::new(tokio::sync::Mutex::new(receiver)),
114            },
115        )?;
116        PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
117    }
118
119    pub(super) fn post(&self, dest: &PyActorAddr, message: &PythonMessage) -> PyResult<()> {
120        let port_id = dest.inner.port_addr(PythonMessage::port().into());
121        let message = wirevalue::Any::serialize(message).map_err(|err| {
122            PyRuntimeError::new_err(format!(
123                "failed to serialize message ({:?}) to Any: {}",
124                message, err
125            ))
126        })?;
127        let envelope = MessageEnvelope::new(
128            self.inner.actor_addr().clone(),
129            port_id,
130            message,
131            Flattrs::new(),
132        );
133        let return_handle = self
134            .inner
135            .bound_return_handle()
136            .unwrap_or(monitored_return_handle());
137        self.inner.post(envelope, return_handle);
138        Ok(())
139    }
140
141    #[getter]
142    pub(super) fn actor_id(&self) -> PyActorAddr {
143        PyActorAddr {
144            inner: self.inner.actor_addr().clone(),
145        }
146    }
147
148    fn __repr__(&self) -> String {
149        format!("{:?}", self.inner)
150    }
151}
152
153#[pyclass(
154    frozen,
155    name = "PortId",
156    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
157)]
158#[derive(Clone)]
159pub struct PyPortId {
160    inner: hyperactor::PortAddr,
161}
162
163impl From<hyperactor::PortAddr> for PyPortId {
164    fn from(port_id: hyperactor::PortAddr) -> Self {
165        Self { inner: port_id }
166    }
167}
168
169impl From<PyPortId> for hyperactor::PortAddr {
170    fn from(port_id: PyPortId) -> Self {
171        port_id.inner
172    }
173}
174
175impl From<Mailbox> for PyMailbox {
176    fn from(inner: Mailbox) -> Self {
177        PyMailbox { inner }
178    }
179}
180
181#[pymethods]
182impl PyPortId {
183    #[new]
184    #[pyo3(signature = (*, actor_id, port))]
185    fn new(actor_id: &PyActorAddr, port: u64) -> Self {
186        Self {
187            inner: actor_id.inner.port_addr(port.into()),
188        }
189    }
190
191    #[staticmethod]
192    fn from_string(port_id: &str) -> PyResult<Self> {
193        Ok(Self {
194            inner: port_id.parse().map_err(|e| {
195                PyValueError::new_err(format!("Failed to parse port id '{}': {}", port_id, e))
196            })?,
197        })
198    }
199
200    #[getter]
201    fn actor_id(&self) -> PyActorAddr {
202        PyActorAddr {
203            inner: self.inner.actor_addr(),
204        }
205    }
206
207    #[getter]
208    fn index(&self) -> u64 {
209        self.inner.index()
210    }
211
212    fn __repr__(&self) -> String {
213        self.inner.to_string()
214    }
215
216    fn __hash__(&self) -> u64 {
217        let mut hasher = DefaultHasher::new();
218        self.inner.to_string().hash(&mut hasher);
219        hasher.finish()
220    }
221
222    fn __eq__(&self, other: &Bound<'_, PyAny>) -> PyResult<bool> {
223        if let Ok(other) = other.extract::<PyPortId>() {
224            Ok(self.inner == other.inner)
225        } else {
226            Ok(false)
227        }
228    }
229
230    fn __reduce__<'py>(slf: &Bound<'py, Self>) -> PyResult<(Bound<'py, PyAny>, (String,))> {
231        Ok((slf.getattr("from_string")?, (slf.borrow().__repr__(),)))
232    }
233}
234
235impl std::fmt::Debug for PyPortId {
236    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        self.inner.fmt(f)
238    }
239}
240
241#[derive(Clone, Debug)]
242#[pyclass(
243    name = "PortHandle",
244    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
245)]
246pub(crate) struct PythonPortHandle {
247    inner: PortHandle<PythonMessage>,
248}
249
250impl PythonPortHandle {
251    pub(crate) fn new(inner: PortHandle<PythonMessage>) -> Self {
252        Self { inner }
253    }
254}
255
256#[pymethods]
257impl PythonPortHandle {
258    fn send(&self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
259        self.inner.post(instance.deref(), message);
260        Ok(())
261    }
262
263    fn bind(&self) -> PythonPortRef {
264        PythonPortRef {
265            inner: self.inner.bind(),
266        }
267    }
268}
269
270#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
271#[pyclass(
272    name = "PortRef",
273    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
274)]
275pub struct PythonPortRef {
276    pub(crate) inner: hyperactor::PortRef<PythonMessage>,
277}
278
279#[pymethods]
280impl PythonPortRef {
281    #[new]
282    fn new(port: PyPortId) -> Self {
283        Self {
284            inner: hyperactor::PortRef::attest(port.inner),
285        }
286    }
287    fn __reduce__(slf: Bound<'_, PythonPortRef>) -> PyResult<(Bound<'_, PyType>, (PyPortId,))> {
288        let id: PyPortId = (*slf.borrow()).inner.port_addr().clone().into();
289        Ok((slf.get_type(), (id,)))
290    }
291
292    fn send(&self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
293        self.inner.post(instance.deref(), message);
294        Ok(())
295    }
296
297    fn __repr__(&self) -> String {
298        self.inner.to_string()
299    }
300
301    #[getter]
302    fn port_id(&self) -> PyResult<PyPortId> {
303        Ok(self.inner.port_addr().clone().into())
304    }
305
306    #[getter]
307    fn get_return_undeliverable(&self) -> bool {
308        self.inner.get_return_undeliverable()
309    }
310
311    #[setter]
312    fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
313        self.inner.return_undeliverable(return_undeliverable);
314    }
315}
316
317impl From<hyperactor::PortRef<PythonMessage>> for PythonPortRef {
318    fn from(port_ref: hyperactor::PortRef<PythonMessage>) -> Self {
319        Self { inner: port_ref }
320    }
321}
322
323#[derive(Debug)]
324#[pyclass(
325    name = "PortReceiver",
326    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
327)]
328pub(super) struct PythonPortReceiver {
329    inner: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
330}
331
332async fn recv_async(
333    receiver: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
334) -> PyResult<Py<PyAny>> {
335    let message = receiver
336        .lock()
337        .await
338        .recv()
339        .await
340        .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
341
342    monarch_with_gil(|py| message.into_py_any(py)).await
343}
344
345#[pymethods]
346impl PythonPortReceiver {
347    fn recv_task(&mut self) -> PyResult<PyPythonTask> {
348        let receiver = self.inner.clone();
349        Ok(PythonTask::new(recv_async(receiver))?.into())
350    }
351}
352
353impl PythonPortReceiver {
354    #[allow(dead_code)]
355    pub(super) fn inner(&self) -> Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>> {
356        Arc::clone(&self.inner)
357    }
358}
359
360#[derive(Debug)]
361#[pyclass(
362    name = "UndeliverableMessageEnvelope",
363    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
364)]
365pub(crate) struct PythonUndeliverableMessageEnvelope {
366    pub(crate) inner: Option<Undeliverable<MessageEnvelope>>,
367}
368
369impl PythonUndeliverableMessageEnvelope {
370    fn inner(&self) -> PyResult<&Undeliverable<MessageEnvelope>> {
371        self.inner.as_ref().ok_or_else(|| {
372            PyErr::new::<PyRuntimeError, _>(
373                "PythonUndeliverableMessageEnvelope was already consumed",
374            )
375        })
376    }
377
378    pub(crate) fn take(&mut self) -> anyhow::Result<Undeliverable<MessageEnvelope>> {
379        self.inner.take().ok_or_else(|| {
380            anyhow::anyhow!("PythonUndeliverableMessageEnvelope was already consumed")
381        })
382    }
383}
384
385#[pymethods]
386impl PythonUndeliverableMessageEnvelope {
387    fn __repr__(&self) -> PyResult<String> {
388        let inner = self.inner()?;
389        let Some(envelope) = inner.as_message() else {
390            return Ok("UndeliverableMessageEnvelope(lost)".to_string());
391        };
392        Ok(format!(
393            "UndeliverableMessageEnvelope(sender={}, dest={}, error={})",
394            envelope.sender(),
395            envelope.dest(),
396            self.error_msg()?
397        ))
398    }
399
400    fn sender(&self) -> PyResult<PyActorAddr> {
401        let envelope = self.inner()?.as_message().ok_or_else(|| {
402            PyErr::new::<PyRuntimeError, _>("lost undeliverable messages do not have an envelope")
403        })?;
404        Ok(PyActorAddr {
405            inner: envelope.sender().clone(),
406        })
407    }
408
409    fn dest(&self) -> PyResult<PyPortId> {
410        let envelope = self.inner()?.as_message().ok_or_else(|| {
411            PyErr::new::<PyRuntimeError, _>("lost undeliverable messages do not have an envelope")
412        })?;
413        let port_id: hyperactor::PortAddr = envelope.dest().clone();
414        Ok(port_id.into())
415    }
416
417    fn error_msg(&self) -> PyResult<String> {
418        match self.inner()? {
419            Undeliverable::Message(envelope) => {
420                Ok(envelope.error_msg().unwrap_or_else(|| "None".to_string()))
421            }
422            Undeliverable::Lost(lost) => Ok(lost.error.clone()),
423        }
424    }
425}
426
427#[derive(Debug)]
428#[pyclass(
429    name = "OncePortHandle",
430    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
431)]
432pub(super) struct PythonOncePortHandle {
433    inner: Option<OncePortHandle<PythonMessage>>,
434}
435
436#[pymethods]
437impl PythonOncePortHandle {
438    fn send(&mut self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
439        let Some(port) = self.inner.take() else {
440            return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
441        };
442        port.post(instance.deref(), message);
443        Ok(())
444    }
445
446    fn bind(&mut self) -> PyResult<PythonOncePortRef> {
447        let Some(port) = self.inner.take() else {
448            return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
449        };
450        Ok(PythonOncePortRef {
451            inner: Some(port.bind()),
452        })
453    }
454}
455
456#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
457#[pyclass(
458    name = "OncePortRef",
459    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
460)]
461pub struct PythonOncePortRef {
462    pub(crate) inner: Option<hyperactor::OncePortRef<PythonMessage>>,
463}
464
465#[pymethods]
466impl PythonOncePortRef {
467    #[new]
468    fn new(port: Option<PyPortId>) -> Self {
469        Self {
470            inner: port.map(|port| hyperactor::PortRef::attest(port.inner).into_once()),
471        }
472    }
473    fn __reduce__(
474        slf: Bound<'_, PythonOncePortRef>,
475    ) -> PyResult<(Bound<'_, PyType>, (Option<PyPortId>,))> {
476        let id: Option<PyPortId> = (*slf.borrow())
477            .inner
478            .as_ref()
479            .map(|x: &hyperactor::OncePortRef<PythonMessage>| x.port_addr().clone().into());
480        Ok((slf.get_type(), (id,)))
481    }
482
483    fn send(&mut self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
484        let Some(port_ref) = self.inner.take() else {
485            return Err(PyErr::new::<PyValueError, _>("OncePortRef is already used"));
486        };
487        let port_ref: hyperactor::OncePortRef<PythonMessage> = port_ref;
488        port_ref.post(instance.deref(), message);
489        Ok(())
490    }
491
492    fn __repr__(&self) -> String {
493        self.inner.as_ref().map_or(
494            "OncePortRef is already used".to_string(),
495            |r: &hyperactor::OncePortRef<PythonMessage>| r.to_string(),
496        )
497    }
498
499    #[getter]
500    fn port_id(&self) -> PyResult<PyPortId> {
501        Ok(self.inner.as_ref().unwrap().port_addr().clone().into())
502    }
503
504    #[getter]
505    fn get_return_undeliverable(&self) -> bool {
506        self.inner.as_ref().unwrap().get_return_undeliverable()
507    }
508
509    #[setter]
510    fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
511        if let Some(ref mut inner) = self.inner {
512            inner.return_undeliverable(return_undeliverable);
513        }
514    }
515}
516
517impl From<hyperactor::OncePortRef<PythonMessage>> for PythonOncePortRef {
518    fn from(port_ref: hyperactor::OncePortRef<PythonMessage>) -> Self {
519        Self {
520            inner: Some(port_ref),
521        }
522    }
523}
524
525#[pyclass(
526    name = "OncePortReceiver",
527    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
528)]
529pub(super) struct PythonOncePortReceiver {
530    inner: Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>>,
531}
532
533#[pymethods]
534impl PythonOncePortReceiver {
535    fn recv_task(&mut self) -> PyResult<PyPythonTask> {
536        let Some(receiver) = self.inner.lock().unwrap().take() else {
537            return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
538        };
539        let fut = async move {
540            let message = receiver
541                .recv()
542                .await
543                .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
544
545            monarch_with_gil(|py| message.into_py_any(py)).await
546        };
547        Ok(PythonTask::new(fut)?.into())
548    }
549}
550
551impl PythonOncePortReceiver {
552    #[allow(dead_code)]
553    pub(super) fn inner(&self) -> Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>> {
554        Arc::clone(&self.inner)
555    }
556}
557
558#[derive(
559    Clone,
560    Serialize,
561    Deserialize,
562    Named,
563    PartialEq,
564    FromPyObject,
565    IntoPyObject,
566    Debug
567)]
568pub enum EitherPortRef {
569    Unbounded(PythonPortRef),
570    Once(PythonOncePortRef),
571}
572
573impl Unbind for EitherPortRef {
574    fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
575        match self {
576            EitherPortRef::Unbounded(port_ref) => port_ref.inner.unbind(bindings),
577            EitherPortRef::Once(once_port_ref) => once_port_ref.inner.unbind(bindings),
578        }
579    }
580}
581
582impl Bind for EitherPortRef {
583    fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
584        match self {
585            EitherPortRef::Unbounded(port_ref) => port_ref.inner.bind(bindings),
586            EitherPortRef::Once(once_port_ref) => once_port_ref.inner.bind(bindings),
587        }
588    }
589}
590
591impl EitherPortRef {
592    pub fn get_return_undeliverable(&self) -> bool {
593        match self {
594            EitherPortRef::Unbounded(port_ref) => port_ref.inner.get_return_undeliverable(),
595            EitherPortRef::Once(once_port_ref) => once_port_ref.inner.as_ref().is_some_and(
596                |r: &hyperactor::OncePortRef<PythonMessage>| r.get_return_undeliverable(),
597            ),
598        }
599    }
600
601    pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
602        match self {
603            EitherPortRef::Unbounded(port_ref) => {
604                port_ref.inner.return_undeliverable(return_undeliverable);
605            }
606            EitherPortRef::Once(once_port_ref) => {
607                if let Some(ref mut inner) = once_port_ref.inner {
608                    inner.return_undeliverable(return_undeliverable);
609                }
610            }
611        }
612    }
613
614    /// Post a message through this port reference.
615    /// The message is first resolved for any pending pickle state before sending.
616    pub fn post(
617        &mut self,
618        cx: &impl hyperactor::context::Actor,
619        message: crate::actor::PythonMessage,
620    ) -> anyhow::Result<()> {
621        match self {
622            EitherPortRef::Unbounded(port_ref) => port_ref.inner.post(cx, message),
623            EitherPortRef::Once(once_port_ref) => {
624                let port = once_port_ref
625                    .inner
626                    .take()
627                    .ok_or_else(|| anyhow::anyhow!("OncePortRef already used"))?;
628                port.post(cx, message);
629            }
630        }
631        Ok(())
632    }
633
634    /// Post a message through this port reference with
635    /// caller-supplied envelope headers. Delegates to the underlying
636    /// `PortRef::post_with_headers` /
637    /// `OncePortRef::post_with_headers`.
638    pub fn post_with_headers(
639        &mut self,
640        cx: &impl hyperactor::context::Actor,
641        headers: hyperactor_config::Flattrs,
642        message: crate::actor::PythonMessage,
643    ) -> anyhow::Result<()> {
644        match self {
645            EitherPortRef::Unbounded(port_ref) => {
646                port_ref.inner.post_with_headers(cx, headers, message)
647            }
648            EitherPortRef::Once(once_port_ref) => {
649                let port = once_port_ref
650                    .inner
651                    .take()
652                    .ok_or_else(|| anyhow::anyhow!("OncePortRef already used"))?;
653                port.post_with_headers(cx, headers, message);
654            }
655        }
656        Ok(())
657    }
658}
659
660#[derive(Debug, Named)]
661struct PythonReducer(Py<PyAny>);
662
663impl PythonReducer {
664    fn new(params: Option<wirevalue::Any>) -> anyhow::Result<Self> {
665        let p = params.ok_or_else(|| anyhow::anyhow!("params cannot be None"))?;
666        let obj: PickledPyObject = p.deserialized()?;
667        Ok(monarch_with_gil_blocking(
668            |py: Python<'_>| -> PyResult<Self> {
669                let unpickled = obj.unpickle(py)?;
670                Ok(Self(unpickled.unbind()))
671            },
672        )?)
673    }
674}
675
676impl CommReducer for PythonReducer {
677    type Update = PythonMessage;
678
679    fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
680        monarch_with_gil_blocking(|py: Python<'_>| -> PyResult<PythonMessage> {
681            let result = self.0.call(py, (left, right), None)?;
682            result.extract::<PythonMessage>(py)
683        })
684        .map_err(Into::into)
685    }
686}
687
688struct PythonAccumulator {
689    accumulator: Py<PyAny>,
690    reducer: Option<wirevalue::Any>,
691}
692
693impl PythonAccumulator {
694    fn new(py: Python<'_>, accumulator: Py<PyAny>) -> PyResult<Self> {
695        let py_reducer = accumulator.getattr(py, "reducer")?;
696        let reducer: Option<wirevalue::Any> = if py_reducer.is_none(py) {
697            None
698        } else {
699            let pickled = PickledPyObject::cloudpickle(py_reducer.bind(py))?;
700            Some(
701                wirevalue::Any::serialize(&pickled)
702                    .map_err(|e| PyRuntimeError::new_err(e.to_string()))?,
703            )
704        };
705
706        Ok(Self {
707            accumulator,
708            reducer,
709        })
710    }
711}
712
713impl Accumulator for PythonAccumulator {
714    type State = PythonMessage;
715    type Update = PythonMessage;
716
717    fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
718        monarch_with_gil_blocking(|py: Python<'_>| -> PyResult<()> {
719            // Initialize state if it is empty.
720            if matches!(state.kind, PythonMessageKind::Uninit {}) {
721                *state = self
722                    .accumulator
723                    .getattr(py, "initial_state")?
724                    .extract::<PythonMessage>(py)?;
725            }
726
727            // TODO(pzhang) Make accumulate consumes state and update, and returns
728            // a new state. That will avoid this clone.
729            let old_state = state.clone();
730            let result = self.accumulator.call(py, (old_state, update), None)?;
731            *state = result.extract::<PythonMessage>(py)?;
732            Ok(())
733        })
734        .map_err(Into::into)
735    }
736
737    fn reducer_spec(&self) -> Option<ReducerSpec> {
738        self.reducer.as_ref().map(|r| ReducerSpec {
739            typehash: <PythonReducer as Named>::typehash(),
740            builder_params: Some(r.clone()),
741        })
742    }
743}
744
745inventory::submit! {
746    ReducerFactory {
747        typehash_f: <PythonReducer as Named>::typehash,
748        builder_f: |params| Ok(Box::new(PythonReducer::new(params)?)),
749    }
750}
751
752py_global!(point, "monarch._src.actor.actor_mesh", "Point");
753
754pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
755    hyperactor_mod.add_class::<PyMailbox>()?;
756    hyperactor_mod.add_class::<PyPortId>()?;
757    hyperactor_mod.add_class::<PythonPortHandle>()?;
758    hyperactor_mod.add_class::<PythonPortRef>()?;
759    hyperactor_mod.add_class::<PythonPortReceiver>()?;
760    hyperactor_mod.add_class::<PythonOncePortHandle>()?;
761    hyperactor_mod.add_class::<PythonOncePortRef>()?;
762    hyperactor_mod.add_class::<PythonOncePortReceiver>()?;
763    hyperactor_mod.add_class::<PythonUndeliverableMessageEnvelope>()?;
764    Ok(())
765}