Stream stays in Pending status; no progress can be made

use futures::future::FutureExt;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::time::{self, Duration};
use futures::stream::{Stream, StreamExt};


struct RxChan {
    inner: broadcast::Receiver<u32>,
}

impl RxChan {
    pub fn new(inner: broadcast::Receiver<u32>) -> Self {
        Self { inner }
    }
    pub async fn recv(&mut self) -> Result<u32, RecvError> {
        self.inner.recv().await
    }
}

struct RxChanStream {
    inner: RxChan,
}

impl RxChanStream {
    pub fn new(chan: RxChan) -> Self {
        Self { inner: chan }
    }
}

impl Stream for RxChanStream {
    type Item = u32;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut future = Box::pin(self.inner.recv());

        match future.poll_unpin(cx) {
            Poll::Pending => {
                println!("Poll::Pending");
                Poll::Pending
            }
            Poll::Ready(Ok(data)) => {
                println!("Poll::Ready({})", data);
                Poll::Ready(Some(data))
            }
            Poll::Ready(Err(_err)) => {
                println!("Poll::Ready(None)");
                Poll::Ready(None)
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = broadcast::channel(16);

    let mut stream = RxChanStream::new(RxChan::new(rx));

    let join = tokio::spawn(async move {
        time::sleep(Duration::from_millis(20)).await;
        tx.send(0xffeeddff_u32).unwrap();
    });

    while let Some(val) = stream.next().await {
        println!("Received {:x?}", val);
    }

    join.await.unwrap();

    println!("Hello, world!");
}

I implement a simple stream based on the tokio broadcast channel. However, the stream seems can be polled once and hangs the entire program.

How could I fix this?

The problem is that your poll_next function is doing this:

  1. Create a future by calling self.inner.recv().
  2. Poll that future.
  3. Destroy the future.

Unfortunately, destroying the future will cause Tokio to deregister the notifications you need to continue performing work, because the waker is stored inside that future.

To fix this, use the BroadcastStream utility from the tokio-stream crate. This is a wrapper around a broadcast receiver that implements the Stream trait correctly by storing the future object somewhere it is not destroyed between calls to poll_next.

To fix this, use the BroadcastStream utility

Thanks, @alice. I know there is a BroadcastStream, but I want to implement some complex logic wrapping around the under broadcast channel. The code above is just an illustration to prove the concepts that confused me.

What could I do to fix if I forget BroadcastStream?

You need to ensure that the future returned by recv is not destroyed between calls to poll_next. You can read the source code of BroadcastStream to see how it accomplishes this.

In general, I would recommend that your custom stream uses a BroadcastStream internally to implement its more complex logic, rather than trying to use recv directly. You can call poll_next on the BroadcastStream to do that. Alternatively, consider using the async-stream crate, or perhaps reconsider whether you need a Stream at all — a struct with an async fn next not tied to any trait may be enough for your needs.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.