Creating a filtered Stream from a Receiver

I'm trying to implement Stream for my struct, which has an mpsc::UnboundedReceiver rx.

The rx receives a different type (let's call it SomeOtherType) from the item I would like to return in the stream (let's call that MyItem).

I would like to filter out any received values that cannot be converted from SomeOtherType to MyItem. I've already implemented TryInto to achieve this conversion.

Here is my implementation so far:

pub struct MyStruct {
    pub rx: tokio::sync::mpsc::UnboundedReceiver<SomeOtherType>,
}

impl Stream for MyStruct {
    type Item = MyItem;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.rx.poll_recv(cx).map(|some_other_type| {
                if let Ok(my_item) = some_other_type.try_into() {
                    return Some(my_item);
                } else {
                    // ??
                }
                None
        })
    }
}

The issue I have is that I do not know what should be returned in case try_into is Err, since returning None in this case would signify a closing of the stream.

From the documentation:

Return value

There are several possible return values, each indicating a distinct stream state:

  • Poll::Pending means that this stream's next value is not ready yet. Implementations will ensure that the current task will be notified when the next value may be ready.
  • Poll::Ready(Some(val)) means that the stream has successfully produced a value, val, and may produce further values on subsequent poll_next calls.
  • Poll::Ready(None) means that the stream has terminated, and poll_next should not be invoked again.

Which means that you would return Poll::Pending in the case of the failed conversion since you don't want to terminate the stream.

1 Like

As an alternative to what @firebits.io proposed, you may want to call poll_recv in a loop, continuing the loop as long as it returns Poll::Ready(Some(value)) whose value you filter out.

It would look something like this:

pub struct MyStruct {
    pub rx: mpsc::UnboundedReceiver<SomeOtherType>,
}

impl Stream for MyStruct {
    type Item = MyItem;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            let Some(value) = ready!(self.rx.poll_recv(cx)) else { return Poll::Ready(None) };
            if let Ok(my_item) = value.try_into() {
                return Poll::Ready(Some(my_item));
            }
        }
    }
}

As another alternative, you could consider converting the UnboundedReceiver to a Stream using tokio_stream::wrappers::UnboundedReceiverStream, and then perform the item conversion using tokio_stream::StreamExt::filter_map


ps:

If you're using a type from a crate you should mention which crate it comes from. If you just write mpsc::UnboundedReceiver someone normally interprets this to mean that the type is in either the std::mpsc module (but it does not contain such type) or in the mpsc crate (which also does not contain it either!). I assumed you mean tokio::sync::mpsc::UnboundedReceiver, but that's not a give.

Also try to post code that properly shows your intentions. The code you posted has mismatched braces and is likely missing a line or two.