Trying to implement future::stream::Stream but it does not advance

Hi all!

I'm playing around with async rust and I'm trying to solve the toy problem of receiving all content from a paginated web resource. That is, each page contains something I can parse into a struct PageContent and also contains an Option<NextLink>.

I want to implement this using a futures::stream::Stream but can't seem to get the poll_next right. It appears that my handling of the Waker is not correct. To elaborate, let me give you some pseudocode:

struct PageContent {/*...*/}

struct NextLink {/*...*/}

struct PaginationStream {
    next_link: Option<Arc<NextLink>>
}

/// Asynchronously retrieve the body of the HTML GET to `url` as `String`.
async fn get_body(url: &NextLink) -> String {/*...*/}

fn parse_body(body: String) -> (PageContent, Option<NextLink>) {/*...*/}

When I tried to implement poll_next for the Stream trait (again, some details omitted for readability), I've tried the following implementation. However, it is stuck even before making the first HTTP GET (which I verified using plain old debug prints).

impl Stream for PaginationStream {
    type Item = PageContent;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {

        match &self.next_link {
            None => Poll::Ready(None),
            Some(arc_link) => {
                let arc_link = arc_link.clone();
                let mut fut = Box::pin(async move {
                    println!("before poll body");
                    let body = get_response_body(&*arc_link).await;
                    println!("after poll body");
                    let t = parse_body(body);

                    self.next_link = t.1.map(|l| Arc::new(l));
                    Some(t.0)
                });

                fut.as_mut().poll(cx)
            }
        }
    }
}

However, block_on(my_pagination_stream_instance.next()) (.next from the StreamExt trait) is stuck forever after it printed before poll body.

I guess I did something wrong with the Context and its Waker, i.e. that the poll in the last line does not wake up the correct task.

Do you see what could have gone wrong?

Every time you call poll_next, you are creating a new future that generates a new get_response_body. This isnt't what you want. To fix this, you need to store the future and call it when you enter poll_next again. like so

3 Likes

Ah - yes! Now that you've pointed it out, my mistake seems obvious :slight_smile: Thank you very much, you solution works perfectly.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.