Hey, so after hanging around with some people smarter than me, i was advised to write this code, instead of sending a new Vec through the channel (resultant_tx), every time.
let mut buf = [0; 4096];
let mut bytes_buf = BytesMut::with_capacity(buf.len());
loop {
let bytes_read = stdout.read(&mut buf)?;
if bytes_read == 0 {
return Result::<_, ServerError>::Ok(());
}
bytes_buf.extend(&buf);
let bytes = bytes_buf.split_to(bytes_read).freeze();
resultant_tx
.blocking_send(bytes)
.expect("rx is never dropped");
}
But I thought Bytes was basically an Arc<Vec<T>> with some extra functionality.
The above code looks kinda magical to me honestly... extending, splitting, and freezing.
And I just wouldn't know where to go to get an understanding for what or why it works the way it does.
Splitting is an important use-case. You may be working with a network protocol that has some header before the rest of the data. With a Vec, you have a choice of allocating another Vec, or copying data within a Vec, or using a temporarily borrowed slice, which is possible only in limited cases.
However, Bytes is able to behave like Arc<Vec> for any part of the Vec, without copying, so you can slice it up and return/store/send parts of it.
Interesting, okay, so I believe what is happening is that I'm creating like "owned slices" into the underlying Bytes buffer.
But does that mean that bytes_buf is consuming everything into RAM? because that's what I precisely want not to happen... I want to send of chunks, as a stream, not bloating my RAM with the entirety.
This was also my first thought when looking at the code. I think if bytes_buf needs to reallocate, it would have to get detached from all the old bytes, so it'll be fine. But you should test it to see what happens.
In any case, if you want to have more control over things and use Vec instead, you could have a channel going the opposite direction that sends back the consumed Vecs so you can reuse their allocations. If the channel is empty, this side can create a new Vec, and if the channel is full, the other side can simply drop the Vec instead.
Personally would stick with Vec first. I don't think you should be worrying about excessive RAM usage either way. BytesMut does not guarantee an increase in performance. A single Arc that gets cloned and dropped frequently across threads can be a performance killer vs more allocation. Optimisation should come after you have working code.
reserve doc gives a hint of what is being done. Best to read the source to dive into what it does.
Well I wanted to play with alternatives But turns out Actix needs a stream where Item = Bytes (not sure why they didn't choose something like AsRef<[u8]> + 'static + Clone or something similar.
In any case, I added a dbg!(bytes_buf.len()) right after the call to extend and it never grows! so that's good.
You probably want to check bytes_buf.capacity() instead— That will tell you how big the allocation is instead of how many "live" bytes it currently contains.
It clearly states that the extend method will -allocate reserve. All other methods will panic if there isn't enough memory. So your code would just continue to allocate.
EDIT
Extend uses reserve to allocate new memory and it says the following in the docs
So as your bytes presumably get dropped on the other end of the resultant_tx the "lost" memory should be reclaimed at some point by reserve, as long as bytes_buf is the only BytesMut left and all Bytes have been dropped.
EDIT 2
Sorry didn't see @jonh s answer. He linked a newer version of the docs, which documents, that the exact condition for when space is reclaimed and when new space is allocated may be subject to change. So no guarantee accross versions.