Thanks john. So how should I poll the send() future? From what I understand from your comment, I should copy or borrow it to poll to prevent it from dropped? If it is dropped, why there are no compile nor runtime errors?
Well to keep it around, you would have to store the return value of send() in a field of your struct, and keep polling that field on every call to poll_next until the send() future returns Poll::Ready.
Additionally be aware that for_each_concurrent does not do what you think. It wont parallelize obtaining items from the stream (it can't). The async block with the println! is what is parallelized. Of course, when it has no .awaits, there's nothing for it to parallelize, so even if your stream was correct, it would still not be concurrent.
In this case you should probably not be making your own stream and just use FuturesUnordered.
use futures::future::StreamExt; // for .next()
let mut futs = FuturesUnordered::new();
let mut results = Vec::new();
for request in requests {
futs.push(request.send());
if futs.len() >= 10 {
results.push(futs.next().await.expect("empty but has 10 items")?);
}
}
while let Some(res) = futs.next().await {
results.push(res?);
}
Obtaining values from the stream would not be concurrent in any way. It would be exactly like any other loop over the stream. The part that is concurrent is the async block. Once the items are returned by the stream, the web requests I use as an example would be run concurrently.
As for concurrent vs parallel, well that's merely that for_each_concurrent wont run the async blocks on several threads. It runs them concurrently in a single thread in the manner described here.