I'm trying to implement Stream
for my struct, which has an mpsc::UnboundedReceiver
rx
.
The rx
receives a different type (let's call it SomeOtherType
) from the item I would like to return in the stream (let's call that MyItem
).
I would like to filter out any received values that cannot be converted from SomeOtherType
to MyItem
. I've already implemented TryInto
to achieve this conversion.
Here is my implementation so far:
pub struct MyStruct {
pub rx: tokio::sync::mpsc::UnboundedReceiver<SomeOtherType>,
}
impl Stream for MyStruct {
type Item = MyItem;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx).map(|some_other_type| {
if let Ok(my_item) = some_other_type.try_into() {
return Some(my_item);
} else {
// ??
}
None
})
}
}
The issue I have is that I do not know what should be returned in case try_into
is Err
, since returning None
in this case would signify a closing of the stream.
From the documentation:
There are several possible return values, each indicating a distinct stream state:
Poll::Pending
means that this stream's next value is not ready yet. Implementations will ensure that the current task will be notified when the next value may be ready.
Poll::Ready(Some(val))
means that the stream has successfully produced a value, val
, and may produce further values on subsequent poll_next
calls.
Poll::Ready(None)
means that the stream has terminated, and poll_next
should not be invoked again.
Which means that you would return Poll::Pending
in the case of the failed conversion since you don't want to terminate the stream.
1 Like
As an alternative to what @firebits.io proposed, you may want to call poll_recv
in a loop, continuing the loop as long as it returns Poll::Ready(Some(value))
whose value
you filter out.
It would look something like this:
pub struct MyStruct {
pub rx: mpsc::UnboundedReceiver<SomeOtherType>,
}
impl Stream for MyStruct {
type Item = MyItem;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let Some(value) = ready!(self.rx.poll_recv(cx)) else { return Poll::Ready(None) };
if let Ok(my_item) = value.try_into() {
return Poll::Ready(Some(my_item));
}
}
}
}
As another alternative, you could consider converting the UnboundedReceiver
to a Stream
using tokio_stream::wrappers::UnboundedReceiverStream
, and then perform the item conversion using tokio_stream::StreamExt::filter_map
ps:
If you're using a type from a crate you should mention which crate it comes from. If you just write mpsc::UnboundedReceiver
someone normally interprets this to mean that the type is in either the std::mpsc
module (but it does not contain such type) or in the mpsc
crate (which also does not contain it either!). I assumed you mean tokio::sync::mpsc::UnboundedReceiver
, but that's not a give.
Also try to post code that properly shows your intentions. The code you posted has mismatched braces and is likely missing a line or two.