Does bytes::BytesMut really grow indefinitely?

Am I missing something or code like this:

let mut buffer = bytes::BytesMut::with_capacity(5);
reader: &mut tokio::io::AsyncRead = ...
reader.read_buf(buffer).await;

will grow buffer indefinitely? How am I supposed to read terabyte files then?..

This is from bytes documentation:

Growth

BytesMut's BufMut implementation will implicitly grow its buffer as
necessary. However, explicitly reserving the required space up-front before
a series of inserts will be more efficient.

What is the best way to fix it? How can I control maximum buffer size?

I came up with this, but it uses unsafe code (for performance reasons):

   let mut buffer = bytes::BytesMut::with_capacity(5);
   unsafe {
      buffer.set_len(buffer.capacity());
   }
   ...
   reader.read(buffer).await;

If you want to only read a certain number of bytes, you can use take

let mut buffer = bytes::BytesMut::with_capacity(5);
let reader: &mut tokio::io::AsyncRead = ...
reader.take(5).read_buf(&mut buffer).await;

Are you sure it will not allocate more?

this is from tokio:

        if !me.buf.has_remaining_mut() {
            return Poll::Ready(Ok(0));
        }

        let n = {
            let dst = me.buf.chunk_mut();

And the problem is that buf.chunk_mut decides to allocate more without asking me anything:

    #[inline]
    fn chunk_mut(&mut self) -> &mut UninitSlice {
        if self.capacity() == self.len() {
            self.reserve(64);             <--------------- ???!!!
        }
        self.spare_capacity_mut().into()
    }

Also check out the reserve method BytesMut in bytes - Rust

It reuses memory, provided you do consume the BufMut from the front. If you don't consume the data, then it can grow indefinitely.

You can consume the front of a BufMut for example by using the split_to method to split off a relevant chunk for processing, then process that chunk somewhere, somehow, and eventually dropping it.

2 Likes

Does that get called? capacity should be 5 while len should be 0.

A BytesMut will only ever allocate if you try to perform an operation that can only succeed if the BytesMut is enlarged. Since take(5) doesn't read more than 5 bytes and the BytesMut can fit 5 bytes, it will not allocate in that case.

As a maintainer of both Tokio and bytes, I can guarantee you that this behavior will not change in the future.

11 Likes

Appreciate your response!

But I think I am still missing something...

I think my code below "performs an operation (read) that can only success if the BytesMut is enlarged":

/// Just like read_exact() in tokio's `pub trait AsyncReadExt: AsyncRead`, but it does not treat early eof
/// as an error:
pub async fn read_exactly_ignoring_early_eof<R: tokio::io::AsyncRead + Unpin>(reader: &mut R,
                                                                              buffer: &mut impl bytes::BufMut)
                                                                              -> Result<u64> {
   let mut total_bytes_read: u64 = 0;
   loop {
      let bytes_read_res = reader.read_buf(buffer).await;
      // Failed to read bytes? => return Err()
      let bytes_read = bytes_read_res.with_context(|| {
                          anyhow!("Successfully read: {total_bytes_read} bytes. Failed to read more.")
                       })? as u64;
      total_bytes_read += bytes_read;
      if bytes_read == 0 || buffer.has_remaining_mut() == false {
         // No space in buffer? => return what has been read so far:
         // Read 0 bytes? => eof => return what has been read so far:
         break;
      }
   }

   Ok(total_bytes_read)
}

As you can see, I call read_buf in the loop. Which I guess will trigger allocations.

The reason I had to write this code is that:

  • I cannot use read_exact, because it treats eof as an error, and does not guarantee the state of the buffer
  • I cannot use read because it is allowed to read "some bytes", and I really need to fill exactly buffer_size bytes (unless the file that I am reading has been read):
        /// Pulls some bytes from this source into the specified buffer,
        /// returning how many bytes were read.

In other words, imagine a situation where it is important to read whichever is less:

  • a fixed buffer size (like 128MiB)
    • what is situation when it can be important? Imagine content-addressable storage, where every chunk (128MiB) can be addressed by its hash (and files are composed of a list of hashes <-> chunks). You need a little bit more control over size of buffer/chunk than "some bytes".
  • what is left in the file

You can fix it by stopping the loop if the buffer is full. Change the argument from &mut impl BufMut to &mut BytesMut and replace the loop { with this:

while buffer.len() < buffer.capacity() {

This ensures that you will not call read_buf in a situation where it will reallocate.

2 Likes

There's also another possibility:

io.take(max_len).read_to_end(&mut vec).await?;

This only works with Vec<u8> and not BytesMut, but it works similarly.

1 Like

This is smart... Essentially you rely on the fact that as long as there is spare space, it will return whatever it has, without allocation:

    #[inline]
    fn chunk_mut(&mut self) -> &mut UninitSlice {
        if self.capacity() == self.len() {
            self.reserve(64);
        }
        self.spare_capacity_mut().into()
    }

and this is precisely where buffer.len() becomes equal to buffer.capacity() and the loop stops in my code.

One last question: Is it really guaranteed to be working, though?..

One the one hand yes:

fn chunk_mut(&mut self) -> &mut UninitSlice;
Returns a mutable slice starting at the current BufMut position and of
length between 0 and BufMut::remaining_mut().

On the other hand there is this:

This function may trigger an out-of-memory abort if it tries to allocate
memory and fails to do so.

Yes. If the documentation does not give sufficient guarantees to guarantee this, then that's a bug in the documentation.

We're only concerned with the case where it does not try to allocate memory, so this is not relevant.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.