Taking from stream while OK

Hi,

is there a nicer way to write this:

try_stream
    .take_while(|x| future::ready(x.is_ok()))
    .map(|x| x.unwrap())

I need this quite often, but the unwrap is ugly despite being perfectly safe in this case.

If FP not required

while let Some(Ok(x)) = try_stream.next().await { ... }
3 Likes

Thanks. I have seen that one. but this is inside some large stream transformation pipeline, so FP is required.

You can move that to an extension trait if you use it frequently

If you need it often, you can use this utility:

use std::pin::Pin;
use std::task::{Context, Poll};
use futures::stream::Stream;

struct OkStream<S> {
    inner: Option<S>,
}

impl<S, T, E> Stream for OkStream<S>
where
    S: Stream<Item = Result<T, E>>,
{
    type Item = T;
    
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
        let me = unsafe { Pin::into_inner_unchecked(self) };
        match &mut me.inner {
            Some(inner) => {
                let inner = unsafe { Pin::new_unchecked(inner) };
                match inner.poll_next(cx) {
                    Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(item)),
                    Poll::Pending => Poll::Pending,
                    _ => {
                        me.inner = None;
                        Poll::Ready(None)
                    },
                }
            },
            None => Poll::Ready(None),
        }
    }
}
3 Likes

Thanks @alice, that is awesome. Did you write this just to answer my question, or did you have it laying around?

Is there a way to do this without unsafe?

I wrote it for this question. The unsafe can be avoided using the pin-project or pin-project-lite crates, which allow you to hide the unsafe behind a macro.

1 Like

Please note that this will throw away the error, which is usually not what you want. Usually, errors should be handled or at least reported.

1 Like

Yeah, in this case I am streaming something that could possibly become unavailable and just resuming at the last successful position in case of an error, so it should be fine. If something really unexpected happens, I just panic the entire process.

But that is probably the reason this is not one of the standard combinators in TryStreamExt...