monarch_hyperactor/
buffers.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9#![allow(unsafe_op_in_unsafe_fn)]
10
11use std::ffi::c_int;
12use std::ffi::c_void;
13
14use bytes::Buf;
15use bytes::Bytes;
16use bytes::BytesMut;
17use hyperactor_config::CONFIG;
18use hyperactor_config::ConfigAttr;
19use hyperactor_config::attrs::declare_attrs;
20use pyo3::buffer::PyBuffer;
21use pyo3::prelude::*;
22use pyo3::types::PyBytes;
23use pyo3::types::PyBytesMethods;
24use serde::Deserialize;
25use serde::Serialize;
26use serde_multipart::Part;
27use typeuri::Named;
28
29declare_attrs! {
30    /// Threshold below which writes are copied into a contiguous buffer.
31    /// Writes >= this size are stored as zero-copy references.
32    @meta(CONFIG = ConfigAttr::new(
33        Some("MONARCH_HYPERACTOR_SMALL_WRITE_THRESHOLD".to_string()),
34        Some("small_write_threshold".to_string()),
35    ))
36    pub attr SMALL_WRITE_THRESHOLD: usize = 256;
37}
38
39/// Wrapper that keeps Py<PyBytes> alive while allowing zero-copy access to its memory
40struct KeepPyBytesAlive {
41    _py_bytes: Py<PyBytes>,
42    ptr: *const u8,
43    len: usize,
44}
45
46impl KeepPyBytesAlive {
47    fn new(py_bytes: Py<PyBytes>) -> Self {
48        let (ptr, len) = Python::attach(|py| {
49            let bytes_ref = py_bytes.as_bytes(py);
50            (bytes_ref.as_ptr(), bytes_ref.len())
51        });
52        Self {
53            _py_bytes: py_bytes,
54            ptr,
55            len,
56        }
57    }
58}
59
60impl AsRef<[u8]> for KeepPyBytesAlive {
61    fn as_ref(&self) -> &[u8] {
62        // SAFETY: ptr is valid as long as py_bytes is alive (kept alive by Py<PyBytes>)
63        // Python won't free the memory until the Py<PyBytes> refcount reaches 0
64        unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
65    }
66}
67
68// SAFETY: Py<PyBytes> is Send/Sync for immutable bytes
69unsafe impl Send for KeepPyBytesAlive {}
70// SAFETY: Py<PyBytes> is Send/Sync for immutable bytes
71unsafe impl Sync for KeepPyBytesAlive {}
72
73/// A fragment of data in the buffer, either a copy or a reference.
74#[derive(Clone)]
75enum Fragment {
76    /// Small writes that were copied into a contiguous buffer
77    Copy(Bytes),
78    /// Large writes stored as references to Python bytes
79    Reference(Py<PyBytes>),
80}
81
82/// A mutable buffer for reading and writing bytes data.
83///
84/// The `Buffer` struct provides a hybrid interface for accumulating byte data:
85/// - Small writes (< 256 bytes) are copied into a contiguous buffer to minimize fragment overhead
86/// - Large writes (>= 256 bytes) are stored as zero-copy references to Python bytes objects
87///
88/// This approach balances the overhead of per-fragment processing against the cost of copying data.
89///
90/// # Examples
91///
92/// ```python
93/// from monarch._rust_bindings.monarch_hyperactor.buffers import Buffer
94///
95/// # Create a new buffer
96/// buffer = Buffer()
97///
98/// # Write some data - small writes are batched, large writes are zero-copy
99/// buffer.write(b"small")  # copied into pending buffer
100/// buffer.write(b"x" * 1000)  # stored as zero-copy reference
101/// ```
102#[pyclass(subclass, module = "monarch._rust_bindings.monarch_hyperactor.buffers")]
103#[derive(Clone)]
104pub struct Buffer {
105    /// Finalized fragments in write order
106    fragments: Vec<Fragment>,
107    /// Accumulator for pending small writes
108    pending: BytesMut,
109    /// Threshold below which writes are copied into a contiguous buffer.
110    /// Writes >= this size are stored as zero-copy references.
111    threshold: usize,
112}
113
114#[pymethods]
115impl Buffer {
116    /// Creates a new empty buffer.
117    ///
118    /// # Returns
119    /// A new empty `Buffer` instance.
120    #[new]
121    fn new() -> Self {
122        Self {
123            fragments: Vec::new(),
124            pending: BytesMut::new(),
125            threshold: hyperactor_config::global::get(SMALL_WRITE_THRESHOLD),
126        }
127    }
128
129    /// Writes bytes data to the buffer.
130    ///
131    /// Small writes (< 256 bytes) are copied into a contiguous buffer.
132    /// Large writes (>= 256 bytes) are stored as zero-copy references.
133    ///
134    /// # Arguments
135    /// * `buff` - The bytes object to write to the buffer
136    ///
137    /// # Returns
138    /// The number of bytes written (always equal to the length of input bytes)
139    fn write<'py>(&mut self, buff: &Bound<'py, PyBytes>) -> usize {
140        let bytes_written = buff.as_bytes().len();
141
142        if bytes_written < self.threshold {
143            self.pending.extend_from_slice(buff.as_bytes());
144        } else {
145            self.flush_pending();
146            self.fragments
147                .push(Fragment::Reference(buff.clone().unbind()));
148        }
149        bytes_written
150    }
151
152    /// Returns the total number of bytes in the buffer.
153    ///
154    /// This sums the lengths of all fragments (both copied and zero-copy) plus pending bytes.
155    ///
156    /// # Returns
157    /// The total number of bytes stored in the buffer
158    fn __len__(&self) -> usize {
159        let fragments_len: usize = Python::attach(|py| {
160            self.fragments
161                .iter()
162                .map(|frag| match frag {
163                    Fragment::Copy(bytes) => bytes.len(),
164                    Fragment::Reference(py_bytes) => py_bytes.as_bytes(py).len(),
165                })
166                .sum()
167        });
168        fragments_len + self.pending.len()
169    }
170
171    /// Freezes the buffer, converting it into an immutable `FrozenBuffer` for reading.
172    ///
173    /// This consumes all accumulated PyBytes and converts them into a contiguous bytes buffer.
174    /// After freezing, the original buffer is cleared.
175    ///
176    /// This operation should avoided in hot paths as it creates a copy in order to concatenate
177    /// bytes that are fragmented in memory into a single series of contiguous bytes
178    ///
179    /// # Returns
180    /// A new `FrozenBuffer` containing all the bytes that were written to this buffer
181    #[pyo3(name = "freeze")]
182    fn py_freeze(&mut self) -> FrozenBuffer {
183        Buffer::freeze(self)
184    }
185}
186
187impl Default for Buffer {
188    fn default() -> Self {
189        Self {
190            fragments: Vec::new(),
191            pending: BytesMut::new(),
192            threshold: hyperactor_config::global::get(SMALL_WRITE_THRESHOLD),
193        }
194    }
195}
196
197impl Buffer {
198    fn flush_pending(&mut self) {
199        if !self.pending.is_empty() {
200            let bytes = std::mem::take(&mut self.pending).freeze();
201            self.fragments.push(Fragment::Copy(bytes));
202        }
203    }
204
205    /// Converts accumulated data to [`Part`] for zero-copy multipart messages.
206    ///
207    /// Flushes any pending small writes and converts all fragments to bytes::Bytes.
208    pub fn take_part(&mut self) -> Part {
209        self.flush_pending();
210
211        let fragments = std::mem::take(&mut self.fragments);
212
213        Part::from_fragments(
214            fragments
215                .into_iter()
216                .map(|frag| match frag {
217                    Fragment::Copy(bytes) => bytes,
218                    Fragment::Reference(py_bytes) => {
219                        let wrapper = KeepPyBytesAlive::new(py_bytes);
220                        bytes::Bytes::from_owner(wrapper)
221                    }
222                })
223                .collect::<Vec<_>>(),
224        )
225    }
226
227    /// Freezes the buffer, converting it into an immutable `FrozenBuffer` for reading.
228    ///
229    /// This is the Rust-accessible version of the Python freeze method.
230    pub fn freeze(&mut self) -> FrozenBuffer {
231        let part = self.take_part();
232        FrozenBuffer {
233            inner: part.into_bytes(),
234        }
235    }
236}
237
238/// An immutable buffer for reading bytes data.
239///
240/// The `FrozenBuffer` struct provides a read-only interface to byte data. Once created,
241/// the buffer's content cannot be modified, but it supports various reading operations
242/// including line-by-line reading and copying data to external buffers. It implements
243/// Python's buffer protocol for zero-copy access from Python code.
244///
245/// # Examples
246///
247/// ```python
248/// from monarch._rust_bindings.monarch_hyperactor.buffers import Buffer
249///
250/// # Create and populate a buffer
251/// buffer = Buffer()
252/// buffer.write(b"Hello\nWorld\n")
253///
254/// # Freeze it for reading
255/// frozen = buffer.freeze()
256///
257/// # Read all content
258/// content = frozen.read()
259/// print(content)  # b"Hello\nWorld\n"
260///
261/// # Read line by line (create a new frozen buffer)
262/// buffer.write(b"Line 1\nLine 2\n")
263/// frozen = buffer.freeze()
264/// line1 = frozen.readline()
265/// line2 = frozen.readline()
266/// ```
267#[pyclass(subclass, module = "monarch._rust_bindings.monarch_hyperactor.buffers")]
268#[derive(Clone, Serialize, Deserialize, Named, PartialEq, Default)]
269pub struct FrozenBuffer {
270    pub inner: bytes::Bytes,
271}
272wirevalue::register_type!(FrozenBuffer);
273
274impl From<Vec<u8>> for FrozenBuffer {
275    fn from(v: Vec<u8>) -> Self {
276        Self {
277            inner: bytes::Bytes::from(v),
278        }
279    }
280}
281
282impl From<&'static [u8]> for FrozenBuffer {
283    fn from(v: &'static [u8]) -> Self {
284        Self {
285            inner: bytes::Bytes::from_static(v),
286        }
287    }
288}
289
290impl From<FrozenBuffer> for Part {
291    fn from(buf: FrozenBuffer) -> Self {
292        Part::from(buf.inner)
293    }
294}
295
296#[pymethods]
297impl FrozenBuffer {
298    /// Reads bytes from the buffer.
299    ///
300    /// Advances the internal read position by the number of bytes read.
301    /// This is a consuming operation - once bytes are read, they cannot be read again.
302    ///
303    /// # Arguments
304    /// * `size` - Number of bytes to read. If -1 or not provided, reads all remaining bytes
305    ///
306    /// # Returns
307    /// A PyBytes object containing the bytes read from the buffer
308    #[pyo3(signature=(size=-1))]
309    fn read<'py>(mut slf: PyRefMut<'py, Self>, size: i64) -> Bound<'py, PyBytes> {
310        let size = if size <= 0 {
311            slf.inner.remaining() as i64
312        } else {
313            size.min(slf.inner.remaining() as i64)
314        } as usize;
315        let out = PyBytes::new(slf.py(), &slf.inner[..size]);
316        slf.inner.advance(size);
317        out
318    }
319
320    /// Returns the number of bytes remaining in the buffer.
321    ///
322    /// # Returns
323    /// The number of bytes that can still be read from the buffer
324    fn __len__(&self) -> usize {
325        self.inner.remaining()
326    }
327
328    /// Returns a string representation of the buffer content.
329    ///
330    /// This method provides a debug representation of the remaining bytes in the buffer.
331    ///
332    /// # Returns
333    /// A string showing the bytes remaining in the buffer
334    fn __str__(&self) -> String {
335        format!("{:?}", &self.inner[..])
336    }
337
338    /// Implements Python's buffer protocol for zero-copy access.
339    ///
340    /// This method allows Python code to access the buffer's underlying data without copying,
341    /// enabling efficient integration with memoryview, numpy arrays, and other buffer-aware
342    /// Python objects. The buffer is read-only and cannot be modified through this interface.
343    ///
344    /// # Safety
345    /// This method uses unsafe FFI calls to implement Python's buffer protocol.
346    /// The implementation ensures that:
347    /// - The buffer is marked as read-only
348    /// - A reference to the PyObject is held to prevent garbage collection
349    /// - Proper buffer metadata is set for Python interoperability
350    ///
351    /// Adapted from https://docs.rs/crate/pyo3/latest/source/tests/test_buffer.rs
352    unsafe fn __getbuffer__(
353        slf: PyRefMut<'_, Self>,
354        view: *mut pyo3::ffi::Py_buffer,
355        flags: c_int,
356    ) -> PyResult<()> {
357        if view.is_null() {
358            panic!("view is null");
359        }
360        if (flags & pyo3::ffi::PyBUF_WRITABLE) == pyo3::ffi::PyBUF_WRITABLE {
361            panic!("object not writable");
362        }
363        let bytes = &slf.inner;
364        // SAFETY: The view pointer is valid and we're setting up the buffer metadata correctly.
365        // The PyObject reference is held by setting (*view).obj to prevent garbage collection.
366        unsafe {
367            (*view).buf = bytes.as_ptr() as *mut c_void;
368            (*view).len = bytes.len() as isize;
369            (*view).readonly = 1;
370            (*view).itemsize = 1;
371            (*view).ndim = 1;
372            (*view).shape = &mut (*view).len;
373            (*view).strides = &mut (*view).itemsize;
374            (*view).suboffsets = std::ptr::null_mut();
375            (*view).internal = std::ptr::null_mut();
376            // This holds on to the reference to prevent garbage collection
377            (*view).obj = slf.into_ptr();
378        }
379        Ok(())
380    }
381
382    /// Reads a line from the buffer up to a newline character.
383    ///
384    /// Searches for the first newline character ('\n') within the specified size limit
385    /// and returns all bytes up to and including that character. If no newline is found
386    /// within the limit, returns up to `size` bytes. Advances the read position by the
387    /// number of bytes read.
388    ///
389    /// # Arguments
390    /// * `size` - Maximum number of bytes to read. If -1 or not provided, searches through all remaining bytes
391    ///
392    /// # Returns
393    /// A PyBytes object containing the line data (including the newline character if found)
394    #[pyo3(signature=(size=-1))]
395    fn readline<'py>(&mut self, py: Python<'py>, size: i64) -> Bound<'py, PyBytes> {
396        let max_size = if size < 0 {
397            self.inner.remaining() as i64
398        } else {
399            size.min(self.inner.remaining() as i64)
400        } as usize;
401        let size = self.inner[..max_size]
402            .iter()
403            .position(|x| *x == b'\n')
404            .unwrap_or(max_size);
405
406        let tmp = PyBytes::new(py, &self.inner[..max_size]);
407        self.inner.advance(size);
408        tmp
409    }
410
411    /// Reads bytes from the buffer into an existing buffer-like object.
412    ///
413    /// This method implements efficient copying of data from the FrozenBuffer into
414    /// any Python object that supports the buffer protocol (like bytearray, memoryview, etc.).
415    /// The number of bytes copied is limited by either the remaining bytes in this buffer
416    /// or the capacity of the destination buffer, whichever is smaller.
417    ///
418    /// # Arguments
419    /// * `b` - Any Python object that supports the buffer protocol for writing
420    ///
421    /// # Returns
422    /// The number of bytes actually copied into the destination buffer
423    ///
424    /// # Errors
425    /// Returns a PyBufferError if the destination object doesn't support the buffer protocol
426    /// or if there's an error during the copy operation
427    fn readinto<'py>(&mut self, py: Python<'py>, b: &Bound<'py, PyAny>) -> PyResult<i64> {
428        let buff: PyBuffer<u8> = PyBuffer::get(b)?;
429        let to_write = self.inner.remaining().min(buff.item_count());
430        buff.copy_from_slice(py, &self.inner[..to_write])?;
431        self.inner.advance(to_write);
432        Ok(to_write as i64)
433    }
434}
435
436pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
437    hyperactor_mod.add_class::<Buffer>()?;
438    hyperactor_mod.add_class::<FrozenBuffer>()?;
439    Ok(())
440}