Next value of Stream if immediately available?

I feel like this should be a pretty common need. I want to consume all of the immediately available elements of a Stream (up to a max number). Given the type of Stream::poll it seems like this should be very achievable, but I can't see how to do this. My guess is that I just can't find the right search terms.

Any suggestions?

Here is my first attempt at implementing a next_immediate(). Any suggestions?

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures_util::stream::StreamExt;
use futures_util::Stream;

pub trait QuickStream {
    /// If an item is immediately available then return it, otherwise return `None`.
    /// 
    /// This future is guaranteed not to block.
    fn next_immediate(&mut self) -> NextImmediate<'_, Self>;
}
impl<S: ?Sized + Stream + Unpin> QuickStream for S {
    fn next_immediate(&mut self) -> NextImmediate<'_, Self> {
        NextImmediate { stream: self }
    }
}

/// Future for the [`next_immediate`](NextImmediate::next_immediate) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct NextImmediate<'a, St: ?Sized> {
    stream: &'a mut St,
}

impl<St: ?Sized + Unpin> Unpin for NextImmediate<'_, St> {}

impl<St: ?Sized + Stream + Unpin> Future for NextImmediate<'_, St> {
    type Output = Option<Option<St::Item>>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.stream.poll_next_unpin(cx) {
            Poll::Ready(v) => Poll::Ready(Some(v)),
            Poll::Pending => Poll::Ready(None),
        }
    }
}

I'm completely new to this sort of work, so any suggestions for improvement would be greatly appreciated!

You can call .next().now_or_never() in a loop until it fails.

The now_or_never method comes from FutureExt in the futures crate.

5 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.