In my search to download a few files at the same time to speed up our process I started using try_join_all
, but as could be expected the server returned 500 after a few calls started. try_join_all
has for us the ideal behavior of short-circuiting on the first error, because a partial download won't do.
What I found is to have a sort of round_robin of maximum 3 or so downloads in this Stackoverflow post Limiting the number of concurrent futures in join_all!(). The main difference being, I believe, handling the result type.
I'm stuck with an error about not satisfying TryStreamExt, but it is not clear to me from the error message and the documentation how I can satisfy the trait. I thought mapping/converting the error to match std::io::Error with
pub fn convert_error(e: ResponseError) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, format!("Error on streaming HTTP response body: {:?}", e))
}
but that didn't help. Maybe something with pinning ?
Here is a minimal reproducible example. I left the flattening in there because I think it plays a role in the type conversions I have issues with.
use futures::{stream, TryStreamExt};
#[derive(Debug)]
struct ResponseError {
messsage: String
}
async fn get_items_from_files(urls: Vec<String>) -> Result<Vec<Item>, ResponseError> {
let mut futures = Vec::with_capacity(urls.len());
for url in urls {
futures.push(download_file(url));
}
let stream = stream::iter(futures).try_buffer_unordered(3);
let items = stream.collect::<Vec<Vec<Item>>>().await;
Ok(items.into_iter().flatten().collect::<Vec<Item>>())
}
struct Item {
field: String
}
async fn download_file(url: String) -> Result<Vec<Item>, ResponseError> {
let nb_items = 0;
let all_attributes = Vec::new();
//Async request that populates `all_attributes`
println!("{} products in file", nb_items);
Ok(all_attributes)
}
#[tokio::main]
async fn main() {
let urls = vec![
"https://link1.com".to_string(),
"https://link2.com".to_string(),
"https://link3.com".to_string(),
"https://link4.com".to_string(),
"https://link5.com".to_string(),
];
get_items_from_files(urls).await.unwrap();
}
And the error is simply:
error[E0599]: the method `try_buffer_unordered` exists for struct `Iter<IntoIter<impl Future<Output = Result<Vec<Item>, ResponseError>>>>`, but its trait bounds were not satisfied
--> src/main.rs:15:40
|
15 | let stream = stream::iter(futures).try_buffer_unordered(2);
| ^^^^^^^^^^^^^^^^^^^^ method cannot be called due to unsatisfied trait bounds
|
::: /home/arnaud/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/iter.rs:9:1
|
9 | pub struct Iter<I> {
| ------------------
| |
| doesn't satisfy `_: TryStreamExt`
| doesn't satisfy `_: TryStream`
|
= note: the following trait bounds were not satisfied:
`futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStream`
which is required by `futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStreamExt`
`&futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStream`
which is required by `&futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStreamExt`
`&mut futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStream`
which is required by `&mut futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStreamExt`
warning: unused import: `TryStreamExt`
--> src/main.rs:1:23
|
1 | use futures::{stream, TryStreamExt};
| ^^^^^^^^^^^^
|
= note: `#[warn(unused_imports)]` on by default
For more information about this error, try `rustc --explain E0599`.
warning: `stream-test2` (bin "stream-test2") generated 1 warning
error: could not compile `stream-test2` (bin "stream-test2") due to previous error; 1 warning emitted