Appendix: pytokio (Python/Rust async bridge)#
pytokio#
Monarch needs a way for Python code to kick off Rust futures and later await the result. pytokio.rs provides that.
It does three things:
lets Rust bundle up a future as a Python object (
PythonTask);runs the future on a Tokio runtime;
delivers the result back to Python in an
await-friendly way.
1. Core types#
There are three main types in pytokio.rs:
PythonTask(Rust struct, not exposed directly to Python)PyPythonTask(#[pyclass(name = "PythonTask")]— the Python-visible wrapper)PyShared(the handle you actually await on the Python side)
2. PythonTask: “I have a Rust future”#
At the bottom we have:
pub(crate) struct PythonTask {
future: Mutex<Pin<Box<dyn Future<Output = PyResult<PyObject>> + Send + 'static>>>,
traceback: Option<PyObject>,
}
This is just:
a boxed Rust future that produces a
PyResult<PyObject>, andan optional captured Python traceback.
Bindings create one of these by calling PyPythonTask::new(async move { ... }), which in turn makes a PythonTask.
3. PyPythonTask: the Python-visible task#
#[pyclass(
name = "PythonTask",
module = "monarch._rust_bindings.monarch_hyperactor.pytokio"
)]
pub struct PyPythonTask {
inner: Option<PythonTask>,
}
Important points:
it owns exactly one
PythonTask;that task is consumed when you run it (via
spawn,block_on, etc.);it offers a few driving methods:
spawn(),spawn_abortable(),block_on(),__await__,with_timeout(...).
This is what the higher-level bindings return.
4. How the future actually runs (spawn path)#
pub(crate) fn spawn(&mut self) -> PyResult<PyShared> {
let (tx, rx) = watch::channel(None);
let traceback = self.traceback()?;
let traceback1 = self.traceback()?;
let task = self.take_task()?;
let handle = get_tokio_runtime().spawn(async move {
send_result(tx, task.await, traceback1);
});
Ok(PyShared {
rx,
handle,
abort: false,
traceback,
})
}
What that does:
open a Tokio watch channel
(tx, rx);consume the underlying Rust future (
take_task());run that future on the shared Tokio runtime (
get_tokio_runtime().spawn(...));when it completes, push the result into the watch channel (
send_result(...));return a
PySharedthat holds the receiver and the join handle.
So the Rust future is running in the background, and Python will wait on the receiver.
6. block_on(...) for sync Python#
fn block_on(mut slf: PyRefMut<PyPythonTask>, py: Python<'_>) -> PyResult<PyObject> {
let task = slf.take_task()?;
drop(slf);
signal_safe_block_on(py, task)?
}
This is the same idea, but instead of returning a PyShared, it blocks the current thread and runs the future to completion, using signal_safe_block_on(...) so Ctrl‑C still works.
7. from_coroutine(...): wrap a Python coroutine in the same machinery#
There is also:
#[staticmethod]
fn from_coroutine(py: Python<'_>, coro: PyObject) -> PyResult<PyPythonTask> { ... }
This is the “other direction”: you already have a Python coroutine, but you want to run it under the same task runner so it can await these Rust-backed tasks and keep the Monarch context vars. The implementation:
calls the coroutine’s
__await__,drives it in a loop,
when the coroutine yields another
PythonTask, it awaits that on Tokio,and it keeps restoring the Monarch
context()inside the loop.
The context() preservation is critical for the actor model: when you call context() inside a PythonTask, you get the actor instance that spawned the task, even though you’re running on an arbitrary Tokio thread.
So both “Rust future exposed to Python” and “Python coroutine driven by the same runner” go through the same PyPythonTask shape.
8. What it’s for#
Every v1 binding that needs to do async Rust work does the same thing:
build an async Rust block that calls the real API;
wrap it in
PyPythonTask::new(...);return that to Python.
Then user code can decide to await, .spawn(), or .block_on().