I feel like this should be a pretty common need. I want to consume all of the immediately available elements of a Stream
(up to a max number). Given the type of Stream::poll
it seems like this should be very achievable, but I can't see how to do this. My guess is that I just can't find the right search terms.
Any suggestions?
Here is my first attempt at implementing a next_immediate()
. Any suggestions?
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::stream::StreamExt;
use futures_util::Stream;
pub trait QuickStream {
/// If an item is immediately available then return it, otherwise return `None`.
///
/// This future is guaranteed not to block.
fn next_immediate(&mut self) -> NextImmediate<'_, Self>;
}
impl<S: ?Sized + Stream + Unpin> QuickStream for S {
fn next_immediate(&mut self) -> NextImmediate<'_, Self> {
NextImmediate { stream: self }
}
}
/// Future for the [`next_immediate`](NextImmediate::next_immediate) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct NextImmediate<'a, St: ?Sized> {
stream: &'a mut St,
}
impl<St: ?Sized + Unpin> Unpin for NextImmediate<'_, St> {}
impl<St: ?Sized + Stream + Unpin> Future for NextImmediate<'_, St> {
type Output = Option<Option<St::Item>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.stream.poll_next_unpin(cx) {
Poll::Ready(v) => Poll::Ready(Some(v)),
Poll::Pending => Poll::Ready(None),
}
}
}
I'm completely new to this sort of work, so any suggestions for improvement would be greatly appreciated!
alice
July 21, 2022, 9:21am
3
You can call .next().now_or_never()
in a loop until it fails.
The now_or_never
method comes from FutureExt
in the futures crate.
5 Likes
system
Closed
October 19, 2022, 9:21am
4
This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.