monarch_hyperactor/
mailbox.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use std::hash::DefaultHasher;
10use std::hash::Hash;
11use std::hash::Hasher;
12use std::ops::Deref;
13use std::sync::Arc;
14
15use hyperactor::Mailbox;
16use hyperactor::OncePortHandle;
17use hyperactor::PortHandle;
18use hyperactor::accum::Accumulator;
19use hyperactor::accum::CommReducer;
20use hyperactor::accum::ReducerFactory;
21use hyperactor::accum::ReducerSpec;
22use hyperactor::mailbox::MailboxSender;
23use hyperactor::mailbox::MessageEnvelope;
24use hyperactor::mailbox::OncePortReceiver;
25use hyperactor::mailbox::PortReceiver;
26use hyperactor::mailbox::Undeliverable;
27use hyperactor::mailbox::monitored_return_handle;
28use hyperactor::message::Bind;
29use hyperactor::message::Bindings;
30use hyperactor::message::Unbind;
31use hyperactor::reference;
32use hyperactor_config::Flattrs;
33use monarch_types::PickledPyObject;
34use monarch_types::py_global;
35use pyo3::IntoPyObjectExt;
36use pyo3::exceptions::PyEOFError;
37use pyo3::exceptions::PyRuntimeError;
38use pyo3::exceptions::PyValueError;
39use pyo3::prelude::*;
40use pyo3::types::PyTuple;
41use pyo3::types::PyType;
42use serde::Deserialize;
43use serde::Serialize;
44use typeuri::Named;
45
46use crate::actor::PythonMessage;
47use crate::actor::PythonMessageKind;
48use crate::context::PyInstance;
49use crate::proc::PyActorId;
50use crate::pytokio::PyPythonTask;
51use crate::pytokio::PythonTask;
52use crate::runtime::monarch_with_gil;
53use crate::runtime::monarch_with_gil_blocking;
54
55#[derive(Clone, Debug)]
56#[pyclass(
57    name = "Mailbox",
58    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
59)]
60pub struct PyMailbox {
61    pub(super) inner: Mailbox,
62}
63
64impl PyMailbox {
65    pub fn get_inner(&self) -> &Mailbox {
66        &self.inner
67    }
68}
69
70#[pymethods]
71impl PyMailbox {
72    fn open_port<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
73        let (handle, receiver) = self.inner.open_port();
74        let handle = Py::new(py, PythonPortHandle { inner: handle })?;
75        let receiver = Py::new(
76            py,
77            PythonPortReceiver {
78                inner: Arc::new(tokio::sync::Mutex::new(receiver)),
79            },
80        )?;
81        PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
82    }
83
84    fn open_once_port<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyTuple>> {
85        let (handle, receiver) = self.inner.open_once_port();
86        let handle = Py::new(
87            py,
88            PythonOncePortHandle {
89                inner: Some(handle),
90            },
91        )?;
92        let receiver = Py::new(
93            py,
94            PythonOncePortReceiver {
95                inner: Arc::new(std::sync::Mutex::new(Some(receiver))),
96            },
97        )?;
98        PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
99    }
100
101    fn open_accum_port<'py>(
102        &self,
103        py: Python<'py>,
104        accumulator: Py<PyAny>,
105    ) -> PyResult<Bound<'py, PyTuple>> {
106        let py_accumulator = PythonAccumulator::new(py, accumulator)?;
107        let (handle, receiver) = self.inner.open_accum_port(py_accumulator);
108        let handle = Py::new(py, PythonPortHandle { inner: handle })?;
109        let receiver = Py::new(
110            py,
111            PythonPortReceiver {
112                inner: Arc::new(tokio::sync::Mutex::new(receiver)),
113            },
114        )?;
115        PyTuple::new(py, vec![handle.into_any(), receiver.into_any()])
116    }
117
118    pub(super) fn post(&self, dest: &PyActorId, message: &PythonMessage) -> PyResult<()> {
119        let port_id = dest.inner.port_id(PythonMessage::port());
120        let message = wirevalue::Any::serialize(message).map_err(|err| {
121            PyRuntimeError::new_err(format!(
122                "failed to serialize message ({:?}) to Any: {}",
123                message, err
124            ))
125        })?;
126        let envelope = MessageEnvelope::new(
127            self.inner.actor_id().clone(),
128            port_id,
129            message,
130            Flattrs::new(),
131        );
132        let return_handle = self
133            .inner
134            .bound_return_handle()
135            .unwrap_or(monitored_return_handle());
136        self.inner.post(envelope, return_handle);
137        Ok(())
138    }
139
140    #[getter]
141    pub(super) fn actor_id(&self) -> PyActorId {
142        PyActorId {
143            inner: self.inner.actor_id().clone(),
144        }
145    }
146
147    fn __repr__(&self) -> String {
148        format!("{:?}", self.inner)
149    }
150}
151
152#[pyclass(
153    frozen,
154    name = "PortId",
155    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
156)]
157#[derive(Clone)]
158pub struct PyPortId {
159    inner: reference::PortId,
160}
161
162impl From<reference::PortId> for PyPortId {
163    fn from(port_id: reference::PortId) -> Self {
164        Self { inner: port_id }
165    }
166}
167
168impl From<PyPortId> for reference::PortId {
169    fn from(port_id: PyPortId) -> Self {
170        port_id.inner
171    }
172}
173
174impl From<Mailbox> for PyMailbox {
175    fn from(inner: Mailbox) -> Self {
176        PyMailbox { inner }
177    }
178}
179
180#[pymethods]
181impl PyPortId {
182    #[new]
183    #[pyo3(signature = (*, actor_id, port))]
184    fn new(actor_id: &PyActorId, port: u64) -> Self {
185        Self {
186            inner: reference::PortId::new(actor_id.inner.clone(), port),
187        }
188    }
189
190    #[staticmethod]
191    fn from_string(port_id: &str) -> PyResult<Self> {
192        Ok(Self {
193            inner: port_id.parse().map_err(|e| {
194                PyValueError::new_err(format!("Failed to parse port id '{}': {}", port_id, e))
195            })?,
196        })
197    }
198
199    #[getter]
200    fn actor_id(&self) -> PyActorId {
201        PyActorId {
202            inner: self.inner.actor_id().clone(),
203        }
204    }
205
206    #[getter]
207    fn index(&self) -> u64 {
208        self.inner.index()
209    }
210
211    fn __repr__(&self) -> String {
212        self.inner.to_string()
213    }
214
215    fn __hash__(&self) -> u64 {
216        let mut hasher = DefaultHasher::new();
217        self.inner.to_string().hash(&mut hasher);
218        hasher.finish()
219    }
220
221    fn __eq__(&self, other: &Bound<'_, PyAny>) -> PyResult<bool> {
222        if let Ok(other) = other.extract::<PyPortId>() {
223            Ok(self.inner == other.inner)
224        } else {
225            Ok(false)
226        }
227    }
228
229    fn __reduce__<'py>(slf: &Bound<'py, Self>) -> PyResult<(Bound<'py, PyAny>, (String,))> {
230        Ok((slf.getattr("from_string")?, (slf.borrow().__repr__(),)))
231    }
232}
233
234impl std::fmt::Debug for PyPortId {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        self.inner.fmt(f)
237    }
238}
239
240#[derive(Clone, Debug)]
241#[pyclass(
242    name = "PortHandle",
243    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
244)]
245pub(crate) struct PythonPortHandle {
246    inner: PortHandle<PythonMessage>,
247}
248
249impl PythonPortHandle {
250    pub(crate) fn new(inner: PortHandle<PythonMessage>) -> Self {
251        Self { inner }
252    }
253}
254
255#[pymethods]
256impl PythonPortHandle {
257    fn send(&self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
258        self.inner
259            .send(instance.deref(), message)
260            .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
261        Ok(())
262    }
263
264    fn bind(&self) -> PythonPortRef {
265        PythonPortRef {
266            inner: self.inner.bind(),
267        }
268    }
269}
270
271#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
272#[pyclass(
273    name = "PortRef",
274    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
275)]
276pub struct PythonPortRef {
277    pub(crate) inner: reference::PortRef<PythonMessage>,
278}
279
280#[pymethods]
281impl PythonPortRef {
282    #[new]
283    fn new(port: PyPortId) -> Self {
284        Self {
285            inner: reference::PortRef::attest(port.into()),
286        }
287    }
288    fn __reduce__<'py>(
289        slf: Bound<'py, PythonPortRef>,
290    ) -> PyResult<(Bound<'py, PyType>, (PyPortId,))> {
291        let id: PyPortId = (*slf.borrow()).inner.port_id().clone().into();
292        Ok((slf.get_type(), (id,)))
293    }
294
295    fn send(&self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
296        self.inner
297            .send(instance.deref(), message)
298            .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
299        Ok(())
300    }
301
302    fn __repr__(&self) -> String {
303        self.inner.to_string()
304    }
305
306    #[getter]
307    fn port_id(&self) -> PyResult<PyPortId> {
308        Ok(self.inner.port_id().clone().into())
309    }
310
311    #[getter]
312    fn get_return_undeliverable(&self) -> bool {
313        self.inner.get_return_undeliverable()
314    }
315
316    #[setter]
317    fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
318        self.inner.return_undeliverable(return_undeliverable);
319    }
320}
321
322impl From<reference::PortRef<PythonMessage>> for PythonPortRef {
323    fn from(port_ref: reference::PortRef<PythonMessage>) -> Self {
324        Self { inner: port_ref }
325    }
326}
327
328#[derive(Debug)]
329#[pyclass(
330    name = "PortReceiver",
331    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
332)]
333pub(super) struct PythonPortReceiver {
334    inner: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
335}
336
337async fn recv_async(
338    receiver: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
339) -> PyResult<Py<PyAny>> {
340    let message = receiver
341        .lock()
342        .await
343        .recv()
344        .await
345        .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
346
347    monarch_with_gil(|py| message.into_py_any(py)).await
348}
349
350#[pymethods]
351impl PythonPortReceiver {
352    fn recv_task(&mut self) -> PyResult<PyPythonTask> {
353        let receiver = self.inner.clone();
354        Ok(PythonTask::new(recv_async(receiver))?.into())
355    }
356}
357
358impl PythonPortReceiver {
359    #[allow(dead_code)]
360    pub(super) fn inner(&self) -> Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>> {
361        Arc::clone(&self.inner)
362    }
363}
364
365#[derive(Debug)]
366#[pyclass(
367    name = "UndeliverableMessageEnvelope",
368    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
369)]
370pub(crate) struct PythonUndeliverableMessageEnvelope {
371    pub(crate) inner: Option<Undeliverable<MessageEnvelope>>,
372}
373
374impl PythonUndeliverableMessageEnvelope {
375    fn inner(&self) -> PyResult<&Undeliverable<MessageEnvelope>> {
376        self.inner.as_ref().ok_or_else(|| {
377            PyErr::new::<PyRuntimeError, _>(
378                "PythonUndeliverableMessageEnvelope was already consumed",
379            )
380        })
381    }
382
383    pub(crate) fn take(&mut self) -> anyhow::Result<Undeliverable<MessageEnvelope>> {
384        self.inner.take().ok_or_else(|| {
385            anyhow::anyhow!("PythonUndeliverableMessageEnvelope was already consumed")
386        })
387    }
388}
389
390#[pymethods]
391impl PythonUndeliverableMessageEnvelope {
392    fn __repr__(&self) -> PyResult<String> {
393        Ok(format!(
394            "UndeliverableMessageEnvelope(sender={}, dest={}, error={})",
395            self.inner()?.0.sender(),
396            self.inner()?.0.dest(),
397            self.error_msg()?
398        ))
399    }
400
401    fn sender(&self) -> PyResult<PyActorId> {
402        Ok(PyActorId {
403            inner: self.inner()?.0.sender().clone(),
404        })
405    }
406
407    fn dest(&self) -> PyResult<PyPortId> {
408        Ok(self.inner()?.0.dest().clone().into())
409    }
410
411    fn error_msg(&self) -> PyResult<String> {
412        Ok(self
413            .inner()?
414            .0
415            .error_msg()
416            .unwrap_or_else(|| "None".to_string()))
417    }
418}
419
420#[derive(Debug)]
421#[pyclass(
422    name = "OncePortHandle",
423    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
424)]
425pub(super) struct PythonOncePortHandle {
426    inner: Option<OncePortHandle<PythonMessage>>,
427}
428
429#[pymethods]
430impl PythonOncePortHandle {
431    fn send(&mut self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
432        let Some(port) = self.inner.take() else {
433            return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
434        };
435        port.send(instance.deref(), message)
436            .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
437        Ok(())
438    }
439
440    fn bind(&mut self) -> PyResult<PythonOncePortRef> {
441        let Some(port) = self.inner.take() else {
442            return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
443        };
444        Ok(PythonOncePortRef {
445            inner: Some(port.bind()),
446        })
447    }
448}
449
450#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
451#[pyclass(
452    name = "OncePortRef",
453    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
454)]
455pub struct PythonOncePortRef {
456    pub(crate) inner: Option<reference::OncePortRef<PythonMessage>>,
457}
458
459#[pymethods]
460impl PythonOncePortRef {
461    #[new]
462    fn new(port: Option<PyPortId>) -> Self {
463        Self {
464            inner: port.map(|port| reference::PortRef::attest(port.inner).into_once()),
465        }
466    }
467    fn __reduce__<'py>(
468        slf: Bound<'py, PythonOncePortRef>,
469    ) -> PyResult<(Bound<'py, PyType>, (Option<PyPortId>,))> {
470        let id: Option<PyPortId> = (*slf.borrow())
471            .inner
472            .as_ref()
473            .map(|x| x.port_id().clone().into());
474        Ok((slf.get_type(), (id,)))
475    }
476
477    fn send(&mut self, instance: &PyInstance, message: PythonMessage) -> PyResult<()> {
478        let Some(port_ref) = self.inner.take() else {
479            return Err(PyErr::new::<PyValueError, _>("OncePortRef is already used"));
480        };
481        port_ref
482            .send(instance.deref(), message)
483            .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
484        Ok(())
485    }
486
487    fn __repr__(&self) -> String {
488        self.inner
489            .as_ref()
490            .map_or("OncePortRef is already used".to_string(), |r| r.to_string())
491    }
492
493    #[getter]
494    fn port_id(&self) -> PyResult<PyPortId> {
495        Ok(self.inner.as_ref().unwrap().port_id().clone().into())
496    }
497
498    #[getter]
499    fn get_return_undeliverable(&self) -> bool {
500        self.inner.as_ref().unwrap().get_return_undeliverable()
501    }
502
503    #[setter]
504    fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
505        if let Some(ref mut inner) = self.inner {
506            inner.return_undeliverable(return_undeliverable);
507        }
508    }
509}
510
511impl From<reference::OncePortRef<PythonMessage>> for PythonOncePortRef {
512    fn from(port_ref: reference::OncePortRef<PythonMessage>) -> Self {
513        Self {
514            inner: Some(port_ref),
515        }
516    }
517}
518
519#[pyclass(
520    name = "OncePortReceiver",
521    module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
522)]
523pub(super) struct PythonOncePortReceiver {
524    inner: Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>>,
525}
526
527#[pymethods]
528impl PythonOncePortReceiver {
529    fn recv_task(&mut self) -> PyResult<PyPythonTask> {
530        let Some(receiver) = self.inner.lock().unwrap().take() else {
531            return Err(PyErr::new::<PyValueError, _>("OncePort is already used"));
532        };
533        let fut = async move {
534            let message = receiver
535                .recv()
536                .await
537                .map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))?;
538
539            monarch_with_gil(|py| message.into_py_any(py)).await
540        };
541        Ok(PythonTask::new(fut)?.into())
542    }
543}
544
545impl PythonOncePortReceiver {
546    #[allow(dead_code)]
547    pub(super) fn inner(&self) -> Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>> {
548        Arc::clone(&self.inner)
549    }
550}
551
552#[derive(
553    Clone,
554    Serialize,
555    Deserialize,
556    Named,
557    PartialEq,
558    FromPyObject,
559    IntoPyObject,
560    Debug
561)]
562pub enum EitherPortRef {
563    Unbounded(PythonPortRef),
564    Once(PythonOncePortRef),
565}
566
567impl Unbind for EitherPortRef {
568    fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
569        match self {
570            EitherPortRef::Unbounded(port_ref) => port_ref.inner.unbind(bindings),
571            EitherPortRef::Once(once_port_ref) => once_port_ref.inner.unbind(bindings),
572        }
573    }
574}
575
576impl Bind for EitherPortRef {
577    fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
578        match self {
579            EitherPortRef::Unbounded(port_ref) => port_ref.inner.bind(bindings),
580            EitherPortRef::Once(once_port_ref) => once_port_ref.inner.bind(bindings),
581        }
582    }
583}
584
585impl EitherPortRef {
586    pub fn get_return_undeliverable(&self) -> bool {
587        match self {
588            EitherPortRef::Unbounded(port_ref) => port_ref.inner.get_return_undeliverable(),
589            EitherPortRef::Once(once_port_ref) => once_port_ref
590                .inner
591                .as_ref()
592                .is_some_and(|r| r.get_return_undeliverable()),
593        }
594    }
595
596    pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
597        match self {
598            EitherPortRef::Unbounded(port_ref) => {
599                port_ref.inner.return_undeliverable(return_undeliverable);
600            }
601            EitherPortRef::Once(once_port_ref) => {
602                if let Some(ref mut inner) = once_port_ref.inner {
603                    inner.return_undeliverable(return_undeliverable);
604                }
605            }
606        }
607    }
608
609    /// Send a message through this port reference.
610    /// The message is first resolved for any pending pickle state before sending.
611    pub fn send(
612        &mut self,
613        cx: &impl hyperactor::context::Actor,
614        message: crate::actor::PythonMessage,
615    ) -> anyhow::Result<()> {
616        match self {
617            EitherPortRef::Unbounded(port_ref) => port_ref.inner.send(cx, message)?,
618            EitherPortRef::Once(once_port_ref) => {
619                let port = once_port_ref
620                    .inner
621                    .take()
622                    .ok_or_else(|| anyhow::anyhow!("OncePortRef already used"))?;
623                port.send(cx, message)?;
624            }
625        }
626        Ok(())
627    }
628}
629
630#[derive(Debug, Named)]
631struct PythonReducer(Py<PyAny>);
632
633impl PythonReducer {
634    fn new(params: Option<wirevalue::Any>) -> anyhow::Result<Self> {
635        let p = params.ok_or_else(|| anyhow::anyhow!("params cannot be None"))?;
636        let obj: PickledPyObject = p.deserialized()?;
637        Ok(monarch_with_gil_blocking(
638            |py: Python<'_>| -> PyResult<Self> {
639                let unpickled = obj.unpickle(py)?;
640                Ok(Self(unpickled.unbind()))
641            },
642        )?)
643    }
644}
645
646impl CommReducer for PythonReducer {
647    type Update = PythonMessage;
648
649    fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
650        monarch_with_gil_blocking(|py: Python<'_>| -> PyResult<PythonMessage> {
651            let result = self.0.call(py, (left, right), None)?;
652            result.extract::<PythonMessage>(py)
653        })
654        .map_err(Into::into)
655    }
656}
657
658struct PythonAccumulator {
659    accumulator: Py<PyAny>,
660    reducer: Option<wirevalue::Any>,
661}
662
663impl PythonAccumulator {
664    fn new<'py>(py: Python<'py>, accumulator: Py<PyAny>) -> PyResult<Self> {
665        let py_reducer = accumulator.getattr(py, "reducer")?;
666        let reducer: Option<wirevalue::Any> = if py_reducer.is_none(py) {
667            None
668        } else {
669            let pickled = PickledPyObject::cloudpickle(py_reducer.bind(py))?;
670            Some(
671                wirevalue::Any::serialize(&pickled)
672                    .map_err(|e| PyRuntimeError::new_err(e.to_string()))?,
673            )
674        };
675
676        Ok(Self {
677            accumulator,
678            reducer,
679        })
680    }
681}
682
683impl Accumulator for PythonAccumulator {
684    type State = PythonMessage;
685    type Update = PythonMessage;
686
687    fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
688        monarch_with_gil_blocking(|py: Python<'_>| -> PyResult<()> {
689            // Initialize state if it is empty.
690            if matches!(state.kind, PythonMessageKind::Uninit {}) {
691                *state = self
692                    .accumulator
693                    .getattr(py, "initial_state")?
694                    .extract::<PythonMessage>(py)?;
695            }
696
697            // TODO(pzhang) Make accumulate consumes state and update, and returns
698            // a new state. That will avoid this clone.
699            let old_state = state.clone();
700            let result = self.accumulator.call(py, (old_state, update), None)?;
701            *state = result.extract::<PythonMessage>(py)?;
702            Ok(())
703        })
704        .map_err(Into::into)
705    }
706
707    fn reducer_spec(&self) -> Option<ReducerSpec> {
708        self.reducer.as_ref().map(|r| ReducerSpec {
709            typehash: <PythonReducer as Named>::typehash(),
710            builder_params: Some(r.clone()),
711        })
712    }
713}
714
715inventory::submit! {
716    ReducerFactory {
717        typehash_f: <PythonReducer as Named>::typehash,
718        builder_f: |params| Ok(Box::new(PythonReducer::new(params)?)),
719    }
720}
721
722py_global!(point, "monarch._src.actor.actor_mesh", "Point");
723
724pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
725    hyperactor_mod.add_class::<PyMailbox>()?;
726    hyperactor_mod.add_class::<PyPortId>()?;
727    hyperactor_mod.add_class::<PythonPortHandle>()?;
728    hyperactor_mod.add_class::<PythonPortRef>()?;
729    hyperactor_mod.add_class::<PythonPortReceiver>()?;
730    hyperactor_mod.add_class::<PythonOncePortHandle>()?;
731    hyperactor_mod.add_class::<PythonOncePortRef>()?;
732    hyperactor_mod.add_class::<PythonOncePortReceiver>()?;
733    hyperactor_mod.add_class::<PythonUndeliverableMessageEnvelope>()?;
734    Ok(())
735}