use futures::future::FutureExt;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::time::{self, Duration};
use futures::stream::{Stream, StreamExt};
struct RxChan {
inner: broadcast::Receiver<u32>,
}
impl RxChan {
pub fn new(inner: broadcast::Receiver<u32>) -> Self {
Self { inner }
}
pub async fn recv(&mut self) -> Result<u32, RecvError> {
self.inner.recv().await
}
}
struct RxChanStream {
inner: RxChan,
}
impl RxChanStream {
pub fn new(chan: RxChan) -> Self {
Self { inner: chan }
}
}
impl Stream for RxChanStream {
type Item = u32;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut future = Box::pin(self.inner.recv());
match future.poll_unpin(cx) {
Poll::Pending => {
println!("Poll::Pending");
Poll::Pending
}
Poll::Ready(Ok(data)) => {
println!("Poll::Ready({})", data);
Poll::Ready(Some(data))
}
Poll::Ready(Err(_err)) => {
println!("Poll::Ready(None)");
Poll::Ready(None)
}
}
}
}
#[tokio::main]
async fn main() {
let (tx, rx) = broadcast::channel(16);
let mut stream = RxChanStream::new(RxChan::new(rx));
let join = tokio::spawn(async move {
time::sleep(Duration::from_millis(20)).await;
tx.send(0xffeeddff_u32).unwrap();
});
while let Some(val) = stream.next().await {
println!("Received {:x?}", val);
}
join.await.unwrap();
println!("Hello, world!");
}
I implement a simple stream based on the tokio broadcast channel. However, the stream seems can be polled once and hangs the entire program.
How could I fix this?