Why my stream implementation blocks

I'm trying to make http requests concurrently with futures and with a concurrency limit.

I came up with the code below, basically making a stream from a list of RequestBuilders and uses for_each_concurrent to limit the concurrency.

But it just blocks forever after the first request is pending.

// request_stream.rs
pub struct Requests {
    requests: Vec<reqwest::RequestBuilder>,
}

pub fn new(requests: Vec<reqwest::RequestBuilder>) -> Requests {
    Requests { requests }
}

impl Stream for Requests {
    type Item = Result<reqwest::Response, reqwest::Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let req = self.requests.pop();
        match req {
            Some(req) => {
                let poll_res = req.send().poll_unpin(cx);
                match poll_res {
                    Poll::Pending => {
                        println!("pending");
                        return Poll::Pending;
                    }
                    Poll::Ready(res) => {
                        println!("ready");
                        return Poll::Ready(Some(res));
                    }
                }
            }
            None => {
                println!("drained");
                return Poll::Ready(None);
            }
        }
    }
}
// main.rs
async fn download(no: &str, hash: String) -> Result<(), GenericErr> {
    let mut i: i32 = 1;
    let mut reqs: Vec<reqwest::RequestBuilder> = vec![];

    while i < 200 {
        let url = format!("{}/{}-{}/{:>03}", PIC_CND, no, hash, i);
        reqs.push(http().get(&url));
        i = i + 1;
    }
    let req_stream = request_stream::new(reqs); 
    req_stream.for_each_concurrent(10, |req| async move {
        println!("{:?}", req);
    }).await;

    Ok(())
}

What's wrong with this implementation? Learner here, please be gentle if I've done something stupid😝

Thanks.

req.send() -> Make a (temporary) future.
.poll_unpin(cx); -> Drop it even if it was still pending.

(Not read all code or looked much at problem.)

Thanks john. So how should I poll the send() future? From what I understand from your comment, I should copy or borrow it to poll to prevent it from dropped? If it is dropped, why there are no compile nor runtime errors?

Well to keep it around, you would have to store the return value of send() in a field of your struct, and keep polling that field on every call to poll_next until the send() future returns Poll::Ready.

Additionally be aware that for_each_concurrent does not do what you think. It wont parallelize obtaining items from the stream (it can't). The async block with the println! is what is parallelized. Of course, when it has no .awaits, there's nothing for it to parallelize, so even if your stream was correct, it would still not be concurrent.

In this case you should probably not be making your own stream and just use FuturesUnordered.

use futures::future::StreamExt; // for .next()

let mut futs = FuturesUnordered::new();
let mut results = Vec::new();
for request in requests {
    futs.push(request.send());
    if futs.len() >= 10 {
        results.push(futs.next().await.expect("empty but has 10 items")?);
    }
}
while let Some(res) = futs.next().await {
    results.push(res?);
}
2 Likes

I read that for_each_concurrent is not parallel from the doc, but I didn't understand which part is concurrent. Thanks for your guidance!

Let's say you did this:

stream.for_each_concurrent(|i| async move {
    send_web_request_using_item(i).await
}).await;

Obtaining values from the stream would not be concurrent in any way. It would be exactly like any other loop over the stream. The part that is concurrent is the async block. Once the items are returned by the stream, the web requests I use as an example would be run concurrently.

As for concurrent vs parallel, well that's merely that for_each_concurrent wont run the async blocks on several threads. It runs them concurrently in a single thread in the manner described here.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.