monarch_hyperactor/buffers.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(unsafe_op_in_unsafe_fn)]
10
11use std::ffi::c_int;
12use std::ffi::c_void;
13
14use bytes::Buf;
15use bytes::Bytes;
16use bytes::BytesMut;
17use hyperactor_config::CONFIG;
18use hyperactor_config::ConfigAttr;
19use hyperactor_config::attrs::declare_attrs;
20use pyo3::buffer::PyBuffer;
21use pyo3::prelude::*;
22use pyo3::types::PyBytes;
23use pyo3::types::PyBytesMethods;
24use serde::Deserialize;
25use serde::Serialize;
26use serde_multipart::Part;
27use typeuri::Named;
28
29declare_attrs! {
30 /// Threshold below which writes are copied into a contiguous buffer.
31 /// Writes >= this size are stored as zero-copy references.
32 @meta(CONFIG = ConfigAttr::new(
33 Some("MONARCH_HYPERACTOR_SMALL_WRITE_THRESHOLD".to_string()),
34 Some("small_write_threshold".to_string()),
35 ))
36 pub attr SMALL_WRITE_THRESHOLD: usize = 256;
37}
38
39/// Wrapper that keeps Py<PyBytes> alive while allowing zero-copy access to its memory
40struct KeepPyBytesAlive {
41 _py_bytes: Py<PyBytes>,
42 ptr: *const u8,
43 len: usize,
44}
45
46impl KeepPyBytesAlive {
47 fn new(py_bytes: Py<PyBytes>) -> Self {
48 let (ptr, len) = Python::attach(|py| {
49 let bytes_ref = py_bytes.as_bytes(py);
50 (bytes_ref.as_ptr(), bytes_ref.len())
51 });
52 Self {
53 _py_bytes: py_bytes,
54 ptr,
55 len,
56 }
57 }
58}
59
60impl AsRef<[u8]> for KeepPyBytesAlive {
61 fn as_ref(&self) -> &[u8] {
62 // SAFETY: ptr is valid as long as py_bytes is alive (kept alive by Py<PyBytes>)
63 // Python won't free the memory until the Py<PyBytes> refcount reaches 0
64 unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
65 }
66}
67
68// SAFETY: Py<PyBytes> is Send/Sync for immutable bytes
69unsafe impl Send for KeepPyBytesAlive {}
70// SAFETY: Py<PyBytes> is Send/Sync for immutable bytes
71unsafe impl Sync for KeepPyBytesAlive {}
72
73/// A fragment of data in the buffer, either a copy or a reference.
74#[derive(Clone)]
75enum Fragment {
76 /// Small writes that were copied into a contiguous buffer
77 Copy(Bytes),
78 /// Large writes stored as references to Python bytes
79 Reference(Py<PyBytes>),
80}
81
82/// A mutable buffer for reading and writing bytes data.
83///
84/// The `Buffer` struct provides a hybrid interface for accumulating byte data:
85/// - Small writes (< 256 bytes) are copied into a contiguous buffer to minimize fragment overhead
86/// - Large writes (>= 256 bytes) are stored as zero-copy references to Python bytes objects
87///
88/// This approach balances the overhead of per-fragment processing against the cost of copying data.
89///
90/// # Examples
91///
92/// ```python
93/// from monarch._rust_bindings.monarch_hyperactor.buffers import Buffer
94///
95/// # Create a new buffer
96/// buffer = Buffer()
97///
98/// # Write some data - small writes are batched, large writes are zero-copy
99/// buffer.write(b"small") # copied into pending buffer
100/// buffer.write(b"x" * 1000) # stored as zero-copy reference
101/// ```
102#[pyclass(subclass, module = "monarch._rust_bindings.monarch_hyperactor.buffers")]
103#[derive(Clone)]
104pub struct Buffer {
105 /// Finalized fragments in write order
106 fragments: Vec<Fragment>,
107 /// Accumulator for pending small writes
108 pending: BytesMut,
109 /// Threshold below which writes are copied into a contiguous buffer.
110 /// Writes >= this size are stored as zero-copy references.
111 threshold: usize,
112}
113
114#[pymethods]
115impl Buffer {
116 /// Creates a new empty buffer.
117 ///
118 /// # Returns
119 /// A new empty `Buffer` instance.
120 #[new]
121 fn new() -> Self {
122 Self {
123 fragments: Vec::new(),
124 pending: BytesMut::new(),
125 threshold: hyperactor_config::global::get(SMALL_WRITE_THRESHOLD),
126 }
127 }
128
129 /// Writes bytes data to the buffer.
130 ///
131 /// Small writes (< 256 bytes) are copied into a contiguous buffer.
132 /// Large writes (>= 256 bytes) are stored as zero-copy references.
133 ///
134 /// # Arguments
135 /// * `buff` - The bytes object to write to the buffer
136 ///
137 /// # Returns
138 /// The number of bytes written (always equal to the length of input bytes)
139 fn write<'py>(&mut self, buff: &Bound<'py, PyBytes>) -> usize {
140 let bytes_written = buff.as_bytes().len();
141
142 if bytes_written < self.threshold {
143 self.pending.extend_from_slice(buff.as_bytes());
144 } else {
145 self.flush_pending();
146 self.fragments
147 .push(Fragment::Reference(buff.clone().unbind()));
148 }
149 bytes_written
150 }
151
152 /// Returns the total number of bytes in the buffer.
153 ///
154 /// This sums the lengths of all fragments (both copied and zero-copy) plus pending bytes.
155 ///
156 /// # Returns
157 /// The total number of bytes stored in the buffer
158 fn __len__(&self) -> usize {
159 let fragments_len: usize = Python::attach(|py| {
160 self.fragments
161 .iter()
162 .map(|frag| match frag {
163 Fragment::Copy(bytes) => bytes.len(),
164 Fragment::Reference(py_bytes) => py_bytes.as_bytes(py).len(),
165 })
166 .sum()
167 });
168 fragments_len + self.pending.len()
169 }
170
171 /// Freezes the buffer, converting it into an immutable `FrozenBuffer` for reading.
172 ///
173 /// This consumes all accumulated PyBytes and converts them into a contiguous bytes buffer.
174 /// After freezing, the original buffer is cleared.
175 ///
176 /// This operation should avoided in hot paths as it creates a copy in order to concatenate
177 /// bytes that are fragmented in memory into a single series of contiguous bytes
178 ///
179 /// # Returns
180 /// A new `FrozenBuffer` containing all the bytes that were written to this buffer
181 #[pyo3(name = "freeze")]
182 fn py_freeze(&mut self) -> FrozenBuffer {
183 Buffer::freeze(self)
184 }
185}
186
187impl Default for Buffer {
188 fn default() -> Self {
189 Self {
190 fragments: Vec::new(),
191 pending: BytesMut::new(),
192 threshold: hyperactor_config::global::get(SMALL_WRITE_THRESHOLD),
193 }
194 }
195}
196
197impl Buffer {
198 fn flush_pending(&mut self) {
199 if !self.pending.is_empty() {
200 let bytes = std::mem::take(&mut self.pending).freeze();
201 self.fragments.push(Fragment::Copy(bytes));
202 }
203 }
204
205 /// Converts accumulated data to [`Part`] for zero-copy multipart messages.
206 ///
207 /// Flushes any pending small writes and converts all fragments to bytes::Bytes.
208 pub fn take_part(&mut self) -> Part {
209 self.flush_pending();
210
211 let fragments = std::mem::take(&mut self.fragments);
212
213 Part::from_fragments(
214 fragments
215 .into_iter()
216 .map(|frag| match frag {
217 Fragment::Copy(bytes) => bytes,
218 Fragment::Reference(py_bytes) => {
219 let wrapper = KeepPyBytesAlive::new(py_bytes);
220 bytes::Bytes::from_owner(wrapper)
221 }
222 })
223 .collect::<Vec<_>>(),
224 )
225 }
226
227 /// Freezes the buffer, converting it into an immutable `FrozenBuffer` for reading.
228 ///
229 /// This is the Rust-accessible version of the Python freeze method.
230 pub fn freeze(&mut self) -> FrozenBuffer {
231 let part = self.take_part();
232 FrozenBuffer {
233 inner: part.into_bytes(),
234 }
235 }
236}
237
238/// An immutable buffer for reading bytes data.
239///
240/// The `FrozenBuffer` struct provides a read-only interface to byte data. Once created,
241/// the buffer's content cannot be modified, but it supports various reading operations
242/// including line-by-line reading and copying data to external buffers. It implements
243/// Python's buffer protocol for zero-copy access from Python code.
244///
245/// # Examples
246///
247/// ```python
248/// from monarch._rust_bindings.monarch_hyperactor.buffers import Buffer
249///
250/// # Create and populate a buffer
251/// buffer = Buffer()
252/// buffer.write(b"Hello\nWorld\n")
253///
254/// # Freeze it for reading
255/// frozen = buffer.freeze()
256///
257/// # Read all content
258/// content = frozen.read()
259/// print(content) # b"Hello\nWorld\n"
260///
261/// # Read line by line (create a new frozen buffer)
262/// buffer.write(b"Line 1\nLine 2\n")
263/// frozen = buffer.freeze()
264/// line1 = frozen.readline()
265/// line2 = frozen.readline()
266/// ```
267#[pyclass(subclass, module = "monarch._rust_bindings.monarch_hyperactor.buffers")]
268#[derive(Clone, Serialize, Deserialize, Named, PartialEq, Default)]
269pub struct FrozenBuffer {
270 pub inner: bytes::Bytes,
271}
272wirevalue::register_type!(FrozenBuffer);
273
274impl From<Vec<u8>> for FrozenBuffer {
275 fn from(v: Vec<u8>) -> Self {
276 Self {
277 inner: bytes::Bytes::from(v),
278 }
279 }
280}
281
282impl From<&'static [u8]> for FrozenBuffer {
283 fn from(v: &'static [u8]) -> Self {
284 Self {
285 inner: bytes::Bytes::from_static(v),
286 }
287 }
288}
289
290impl From<FrozenBuffer> for Part {
291 fn from(buf: FrozenBuffer) -> Self {
292 Part::from(buf.inner)
293 }
294}
295
296#[pymethods]
297impl FrozenBuffer {
298 /// Reads bytes from the buffer.
299 ///
300 /// Advances the internal read position by the number of bytes read.
301 /// This is a consuming operation - once bytes are read, they cannot be read again.
302 ///
303 /// # Arguments
304 /// * `size` - Number of bytes to read. If -1 or not provided, reads all remaining bytes
305 ///
306 /// # Returns
307 /// A PyBytes object containing the bytes read from the buffer
308 #[pyo3(signature=(size=-1))]
309 fn read<'py>(mut slf: PyRefMut<'py, Self>, size: i64) -> Bound<'py, PyBytes> {
310 let size = if size <= 0 {
311 slf.inner.remaining() as i64
312 } else {
313 size.min(slf.inner.remaining() as i64)
314 } as usize;
315 let out = PyBytes::new(slf.py(), &slf.inner[..size]);
316 slf.inner.advance(size);
317 out
318 }
319
320 /// Returns the number of bytes remaining in the buffer.
321 ///
322 /// # Returns
323 /// The number of bytes that can still be read from the buffer
324 fn __len__(&self) -> usize {
325 self.inner.remaining()
326 }
327
328 /// Returns a string representation of the buffer content.
329 ///
330 /// This method provides a debug representation of the remaining bytes in the buffer.
331 ///
332 /// # Returns
333 /// A string showing the bytes remaining in the buffer
334 fn __str__(&self) -> String {
335 format!("{:?}", &self.inner[..])
336 }
337
338 /// Implements Python's buffer protocol for zero-copy access.
339 ///
340 /// This method allows Python code to access the buffer's underlying data without copying,
341 /// enabling efficient integration with memoryview, numpy arrays, and other buffer-aware
342 /// Python objects. The buffer is read-only and cannot be modified through this interface.
343 ///
344 /// # Safety
345 /// This method uses unsafe FFI calls to implement Python's buffer protocol.
346 /// The implementation ensures that:
347 /// - The buffer is marked as read-only
348 /// - A reference to the PyObject is held to prevent garbage collection
349 /// - Proper buffer metadata is set for Python interoperability
350 ///
351 /// Adapted from https://docs.rs/crate/pyo3/latest/source/tests/test_buffer.rs
352 unsafe fn __getbuffer__(
353 slf: PyRefMut<'_, Self>,
354 view: *mut pyo3::ffi::Py_buffer,
355 flags: c_int,
356 ) -> PyResult<()> {
357 if view.is_null() {
358 panic!("view is null");
359 }
360 if (flags & pyo3::ffi::PyBUF_WRITABLE) == pyo3::ffi::PyBUF_WRITABLE {
361 panic!("object not writable");
362 }
363 let bytes = &slf.inner;
364 // SAFETY: The view pointer is valid and we're setting up the buffer metadata correctly.
365 // The PyObject reference is held by setting (*view).obj to prevent garbage collection.
366 unsafe {
367 (*view).buf = bytes.as_ptr() as *mut c_void;
368 (*view).len = bytes.len() as isize;
369 (*view).readonly = 1;
370 (*view).itemsize = 1;
371 (*view).ndim = 1;
372 (*view).shape = &mut (*view).len;
373 (*view).strides = &mut (*view).itemsize;
374 (*view).suboffsets = std::ptr::null_mut();
375 (*view).internal = std::ptr::null_mut();
376 // This holds on to the reference to prevent garbage collection
377 (*view).obj = slf.into_ptr();
378 }
379 Ok(())
380 }
381
382 /// Reads a line from the buffer up to a newline character.
383 ///
384 /// Searches for the first newline character ('\n') within the specified size limit
385 /// and returns all bytes up to and including that character. If no newline is found
386 /// within the limit, returns up to `size` bytes. Advances the read position by the
387 /// number of bytes read.
388 ///
389 /// # Arguments
390 /// * `size` - Maximum number of bytes to read. If -1 or not provided, searches through all remaining bytes
391 ///
392 /// # Returns
393 /// A PyBytes object containing the line data (including the newline character if found)
394 #[pyo3(signature=(size=-1))]
395 fn readline<'py>(&mut self, py: Python<'py>, size: i64) -> Bound<'py, PyBytes> {
396 let max_size = if size < 0 {
397 self.inner.remaining() as i64
398 } else {
399 size.min(self.inner.remaining() as i64)
400 } as usize;
401 let size = self.inner[..max_size]
402 .iter()
403 .position(|x| *x == b'\n')
404 .unwrap_or(max_size);
405
406 let tmp = PyBytes::new(py, &self.inner[..max_size]);
407 self.inner.advance(size);
408 tmp
409 }
410
411 /// Reads bytes from the buffer into an existing buffer-like object.
412 ///
413 /// This method implements efficient copying of data from the FrozenBuffer into
414 /// any Python object that supports the buffer protocol (like bytearray, memoryview, etc.).
415 /// The number of bytes copied is limited by either the remaining bytes in this buffer
416 /// or the capacity of the destination buffer, whichever is smaller.
417 ///
418 /// # Arguments
419 /// * `b` - Any Python object that supports the buffer protocol for writing
420 ///
421 /// # Returns
422 /// The number of bytes actually copied into the destination buffer
423 ///
424 /// # Errors
425 /// Returns a PyBufferError if the destination object doesn't support the buffer protocol
426 /// or if there's an error during the copy operation
427 fn readinto<'py>(&mut self, py: Python<'py>, b: &Bound<'py, PyAny>) -> PyResult<i64> {
428 let buff: PyBuffer<u8> = PyBuffer::get(b)?;
429 let to_write = self.inner.remaining().min(buff.item_count());
430 buff.copy_from_slice(py, &self.inner[..to_write])?;
431 self.inner.advance(to_write);
432 Ok(to_write as i64)
433 }
434}
435
436pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
437 hyperactor_mod.add_class::<Buffer>()?;
438 hyperactor_mod.add_class::<FrozenBuffer>()?;
439 Ok(())
440}