Collect available results from a futures Stream

I'm looking for something that I think should be both easy to find and easy to implement, but I can't figure out how to do either!

I want something like this API:

let input_stream = // some stream
let batched_stream = input_stream.batches(10)
while let Some(batch) = batch_stream.next().await {
    for value in batch {
        process(value);
    }
}

Where the behaviour of next is to either:

  • Return 10 elements from the underlying stream, if the underlying stream has more than 10 elements immediately available
  • Return all elements from the underlying stream, if the underlying stream has fewer than 10 elements available
  • await on the underlying stream if the stream currently has no elements available
  • return None if the underlying stream is closed

My specific use case is a futures::sync::channel::UnboundedReceiver wrapped in a stream_cancel::TakeUntil. I want to read all available messages from the channel up to a limit, and process them immediately.

I think I could do this with the UnboundedReceiver::try_next method, but the docs explicitly say It is not recommended to call this function from inside of a future.

I've also looked at the peekable, chunks, fold, and buffered functions on StreamExt but they don't have quite the semantics I'm thinking of. I've searched docs.rs for all the search terms I can come up with, but I'm still drawing a blank.

This seems like it should be a fairly common problem, so my question is: Am I missing an incredibly obvious method or library, or am I trying to do something I shouldn't be? :slight_smile:

You should just use try_next. The cavaet listed in the documentation is that it wont schedule your future for wake-up when items do become available, but if you make sure to await properly in the third case, it is not an issue.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.