Hi all!
I'm playing around with async
rust and I'm trying to solve the toy problem of receiving all content from a paginated web resource. That is, each page contains something I can parse into a struct PageContent
and also contains an Option<NextLink>
.
I want to implement this using a futures::stream::Stream but can't seem to get the poll_next
right. It appears that my handling of the Waker is not correct. To elaborate, let me give you some pseudocode:
struct PageContent {/*...*/}
struct NextLink {/*...*/}
struct PaginationStream {
next_link: Option<Arc<NextLink>>
}
/// Asynchronously retrieve the body of the HTML GET to `url` as `String`.
async fn get_body(url: &NextLink) -> String {/*...*/}
fn parse_body(body: String) -> (PageContent, Option<NextLink>) {/*...*/}
When I tried to implement poll_next
for the Stream trait (again, some details omitted for readability), I've tried the following implementation. However, it is stuck even before making the first HTTP GET (which I verified using plain old debug prints).
impl Stream for PaginationStream {
type Item = PageContent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match &self.next_link {
None => Poll::Ready(None),
Some(arc_link) => {
let arc_link = arc_link.clone();
let mut fut = Box::pin(async move {
println!("before poll body");
let body = get_response_body(&*arc_link).await;
println!("after poll body");
let t = parse_body(body);
self.next_link = t.1.map(|l| Arc::new(l));
Some(t.0)
});
fut.as_mut().poll(cx)
}
}
}
}
However, block_on(my_pagination_stream_instance.next())
(.next
from the StreamExt trait) is stuck forever after it printed before poll body
.
I guess I did something wrong with the Context
and its Waker
, i.e. that the poll in the last line does not wake up the correct task.
Do you see what could have gone wrong?