Try_buffered_unordered() to limit concurrent calls with TryStreamExt

In my search to download a few files at the same time to speed up our process I started using try_join_all, but as could be expected the server returned 500 after a few calls started. try_join_all has for us the ideal behavior of short-circuiting on the first error, because a partial download won't do.

What I found is to have a sort of round_robin of maximum 3 or so downloads in this Stackoverflow post Limiting the number of concurrent futures in join_all!(). The main difference being, I believe, handling the result type.

I'm stuck with an error about not satisfying TryStreamExt, but it is not clear to me from the error message and the documentation how I can satisfy the trait. I thought mapping/converting the error to match std::io::Error with

pub fn convert_error(e: ResponseError) -> std::io::Error {
    std::io::Error::new(std::io::ErrorKind::BrokenPipe, format!("Error on streaming HTTP response body: {:?}", e))
}

but that didn't help. Maybe something with pinning ?

Here is a minimal reproducible example. I left the flattening in there because I think it plays a role in the type conversions I have issues with.

use futures::{stream, TryStreamExt};

#[derive(Debug)]
struct ResponseError {
    messsage: String
}

async fn get_items_from_files(urls: Vec<String>) -> Result<Vec<Item>, ResponseError> {
    let mut futures = Vec::with_capacity(urls.len());
    for url in urls {
            futures.push(download_file(url));

    }

    let stream = stream::iter(futures).try_buffer_unordered(3);
    let items = stream.collect::<Vec<Vec<Item>>>().await;

    Ok(items.into_iter().flatten().collect::<Vec<Item>>())
}

struct Item {
    field: String
}

async fn download_file(url: String) -> Result<Vec<Item>, ResponseError> {

    let nb_items = 0;
    let all_attributes = Vec::new();
    //Async request that populates `all_attributes`
    println!("{} products in file", nb_items);
    Ok(all_attributes)
}

#[tokio::main]
async fn main() {
    let urls = vec![
        "https://link1.com".to_string(),
        "https://link2.com".to_string(),
        "https://link3.com".to_string(),
        "https://link4.com".to_string(),
        "https://link5.com".to_string(),
    ];

    get_items_from_files(urls).await.unwrap();
}

And the error is simply:

error[E0599]: the method `try_buffer_unordered` exists for struct `Iter<IntoIter<impl Future<Output = Result<Vec<Item>, ResponseError>>>>`, but its trait bounds were not satisfied
  --> src/main.rs:15:40
   |
15 |     let stream = stream::iter(futures).try_buffer_unordered(2);
   |                                        ^^^^^^^^^^^^^^^^^^^^ method cannot be called due to unsatisfied trait bounds
   |
  ::: /home/arnaud/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/iter.rs:9:1
   |
9  | pub struct Iter<I> {
   | ------------------
   | |
   | doesn't satisfy `_: TryStreamExt`
   | doesn't satisfy `_: TryStream`
   |
   = note: the following trait bounds were not satisfied:
           `futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStream`
           which is required by `futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStreamExt`
           `&futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStream`
           which is required by `&futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStreamExt`
           `&mut futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStream`
           which is required by `&mut futures::stream::Iter<std::vec::IntoIter<impl futures::Future<Output = std::result::Result<std::vec::Vec<Item>, ResponseError>>>>: futures::TryStreamExt`

warning: unused import: `TryStreamExt`
 --> src/main.rs:1:23
  |
1 | use futures::{stream, TryStreamExt};
  |                       ^^^^^^^^^^^^
  |
  = note: `#[warn(unused_imports)]` on by default

For more information about this error, try `rustc --explain E0599`.
warning: `stream-test2` (bin "stream-test2") generated 1 warning
error: could not compile `stream-test2` (bin "stream-test2") due to previous error; 1 warning emitted

The error is saying your type doesn't satisfy the trait bound of TrySteamExt.
You need to know these implementations:

  • iter function only returns a Stream type (possibly a TrySteam/TrySteamExt type)
  • iter returns a TrySteam/TrySteamExt type when the item/element is Result: but in your case, the element is a future that returns Result, thus the error Rust Playground
pub fn iter<I>(i: I) -> Iter<<I as IntoIterator>::IntoIter>
where
    I: IntoIterator,
impl<I> Stream for Iter<I> where I: Iterator {
    type Item = <I as Iterator>::Item 
}

pub trait Stream {}

pub trait TryStream: Stream + Sealed {}
impl<S, T, E> TryStream for S
where
    S: Stream<Item = Result<T, E>> + ?Sized,

pub trait TryStreamExt: TryStream {}
impl<S> TryStreamExt for S
where
    S: TryStream + ?Sized,

One solution is to use methods in StreamExt: Rust Playground

-    let stream = stream::iter(futures).try_buffer_unordered(3);
+    let stream = stream::iter(futures)
+        .buffer_unordered(3)
+        .filter_map(|res| async move { res.ok() });
1 Like

To expand on @vague's solution: using filter_map will ignore any errors, if you want to short circuit you can instead remove it and replace the .collect() with a try_collect(). Moreover seeing how you'll also want to concatenate all the Vecs, you could also replace the collect and flatten with a try_concat. Rust Playground

3 Likes

Expanding even more:

async fn get_items_from_files(urls: Vec<String>) -> Result<Vec<Item>, ResponseError> {
    stream::iter(urls)
        .map(download_file)
        .buffer_unordered(3)
        .try_concat()
        .await
}

The original issue is that try_buffer_unordered works on a stream of Result<Future<Output = Result<T, E>>, E> whereas you have a stream of Future<Output = Result<T, E>. It's useful for when you are chaining fallible futures, but you just have one, so buffer_unordered is fine.

2 Likes