monarch_hyperactor/pywaker.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! Pywaker provides a GIL-free way to wake up Python event loops from Rust.
10//! This is accomplished by writing a single byte to a "self pipe" for every
11//! wake up. The corresponding read-side FD is added to the Python-side event
12//! loop.
13//!
14//! The approach does not add overhead in uncontended scenarios:
15//!
16//! ```ignore
17//! | Approach | Mean | Notes |
18//! |----------------------|--------|---------------------------------|
19//! | FD-based | ~48 µs | No GIL acquisition on Rust side |
20//! | call_soon_threadsafe | ~56 µs | Acquires GIL on Rust side |
21//! | Pure Python | ~44 µs | Baseline for comparison |
22//! ```
23//!
24//! ([Full benchmarks](https://github.com/mariusae/tmp/tree/main/2025-12-16-wakerbench)).
25//!
26//! ## Implementation notes
27//!
28//! [`event`] creates a pipe(2), and wakeup events are implemented by writing a single
29//! byte into the pipe. The Python (read side) event registers the read fd into its
30//! asyncio event loop. Thus the pipe acts as a notification queue. When the fd becomes
31//! readable, the python event drains the pipe and then sets an `asyncio.Event` to notify
32//! any waiters.
33//!
34//! The reader side is always single threaded (it is bound to a specific event loop), and
35//! thus free of race conditions. This is because all read operations occur within a single
36//! asyncio event loop, which processes events sequentially on one thread, eliminating the
37//! possibility of concurrent access to the read side of the pipe.
38
39use std::os::fd::FromRawFd;
40use std::os::fd::IntoRawFd;
41use std::os::fd::OwnedFd;
42use std::os::fd::RawFd;
43
44use monarch_types::MapPyErr;
45use nix::errno::Errno;
46use nix::fcntl::FcntlArg;
47use nix::fcntl::OFlag;
48use nix::fcntl::fcntl;
49use nix::unistd::pipe;
50use nix::unistd::write;
51use pyo3::Bound;
52use pyo3::Py;
53use pyo3::PyAny;
54use pyo3::PyResult;
55use pyo3::prelude::*;
56use pyo3::pyclass;
57use pyo3::types::PyModule;
58use pyo3::types::PyModuleMethods;
59
60/// Waker is is a handle to a [`PyEvent`].
61#[derive(Debug)]
62pub struct Waker {
63 write_fd: OwnedFd,
64}
65
66impl Waker {
67 /// Wake up any Python waiters. This sets the corresponding event, which
68 /// remains set until it is cleared by the Python event loop.
69 pub fn wake(&self) -> Result<bool, nix::Error> {
70 static DATA: [u8; 1] = [b'w'];
71
72 match write(&self.write_fd, &DATA) {
73 Ok(_) => Ok(true),
74 // Pipe is full. This is ok.
75 Err(Errno::EAGAIN) => Ok(true),
76 // The Python side closed the pipe. It is no longer listening.
77 Err(Errno::EPIPE) => Ok(false),
78 Err(e) => Err(e),
79 }
80 }
81}
82
83/// Wakers are not intended to be used from Python; TestWaker
84/// is provided to faciliate pure-Python unit testing.
85// NOTE: We can't use a Python calss name that starts with "Test" since
86// during Python testing, Pytest will inspect anything that starts with
87// "Test" and check if its callable which in pyo3 >= 0.26 will raise
88// a TypeError.
89#[pyclass(name = "PyTestWaker", module = "monarch._src.actor.waker")]
90struct TestPyWaker(Waker);
91
92#[pymethods]
93impl TestPyWaker {
94 fn wake(&self) -> PyResult<bool> {
95 self.0.wake().map_pyerr()
96 }
97
98 #[staticmethod]
99 fn create() -> PyResult<(TestPyWaker, PyEvent)> {
100 let (waker, event) = event().map_pyerr()?;
101 Ok((TestPyWaker(waker), event))
102 }
103}
104
105/// An event that is awoken by a [`Waker`].
106#[pyclass(name = "Event", module = "monarch._src.actor.waker")]
107pub struct PyEvent {
108 #[pyo3(get, name = "_read_fd")]
109 read_fd: RawFd,
110
111 #[pyo3(get, set, name = "_event_loop")]
112 event_loop: Option<Py<PyAny>>,
113
114 #[pyo3(get, set, name = "_event")]
115 event: Option<Py<PyAny>>,
116}
117
118impl Drop for PyEvent {
119 fn drop(&mut self) {
120 // SAFETY: fd was obtained via into_raw_fd() in the event() function
121 let _ = unsafe { OwnedFd::from_raw_fd(self.read_fd) };
122 }
123}
124
125/// Create a new event, returning the (Rust only) [`Waker`], and
126/// a Python [`PyEvent`], intended for passing to Python code.
127pub fn event() -> Result<(Waker, PyEvent), nix::Error> {
128 let (read_fd, write_fd) = pipe()?;
129
130 set_nonblocking(&read_fd)?;
131 set_nonblocking(&write_fd)?;
132
133 Ok((
134 Waker { write_fd },
135 PyEvent {
136 read_fd: read_fd.into_raw_fd(),
137 event_loop: None,
138 event: None,
139 },
140 ))
141}
142
143fn set_nonblocking(fd: &OwnedFd) -> Result<(), nix::Error> {
144 let flags = OFlag::from_bits_truncate(fcntl(fd, FcntlArg::F_GETFL)?) | OFlag::O_NONBLOCK;
145 fcntl(fd, FcntlArg::F_SETFL(flags))?;
146 Ok(())
147}
148
149pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
150 hyperactor_mod.add_class::<PyEvent>()?;
151 hyperactor_mod.add_class::<TestPyWaker>()?;
152 Ok(())
153}