I'm looking for something that I think should be both easy to find and easy to implement, but I can't figure out how to do either!
I want something like this API:
let input_stream = // some stream
let batched_stream = input_stream.batches(10)
while let Some(batch) = batch_stream.next().await {
for value in batch {
process(value);
}
}
Where the behaviour of next is to either:
- Return 10 elements from the underlying stream, if the underlying stream has more than 10 elements immediately available
- Return all elements from the underlying stream, if the underlying stream has fewer than 10 elements available
- await on the underlying stream if the stream currently has no elements available
- return None if the underlying stream is closed
My specific use case is a futures::sync::channel::UnboundedReceiver
wrapped in a stream_cancel::TakeUntil
. I want to read all available messages from the channel up to a limit, and process them immediately.
I think I could do this with the UnboundedReceiver::try_next
method, but the docs explicitly say It is not recommended to call this function from inside of a future.
I've also looked at the peekable, chunks, fold, and buffered functions on StreamExt
but they don't have quite the semantics I'm thinking of. I've searched docs.rs for all the search terms I can come up with, but I'm still drawing a blank.
This seems like it should be a fairly common problem, so my question is: Am I missing an incredibly obvious method or library, or am I trying to do something I shouldn't be?