Async/await and using reqwest to execute multiple GET requests in parallel

    let futures0: Vec<impl Future<Result<Vec<u8>, QuantumError>>> = get_reqwests();
    let mut unordered = futures::stream::futures_unordered::FuturesUnordered::new();
    for fut in futures0 {
        unordered.push(fut);
    }
    
    unordered.map(|arr: Result<Vec<u8>, QuantumError>| {
        match arr {
            Ok(vec) => futures::future::ok(vec),
            Err(err) => futures::future::err(err)
        }
    }).buffer_unordered(len).concat().collect().await

Error:

the trait bound `std::result::Result<std::vec::Vec<u8>, std::boxed::Box<util::StringedError<'_, std::string::String>>>: std::iter::Extend<std::vec::Vec<u8>>` is not satisfied
   --> quantum_random\src\web_async.rs:255:30
    |
255 |     }).buffer_unordered(len).concat().collect().await
    |                              ^^^^^^ the trait `std::iter::Extend<std::vec::Vec<u8>>` is not implemented for `std::result::Result<std::vec::Vec<u8>, std::boxed::Box<util::StringedError<'_, std::string::String>>>`

The idea is that I already constructed multiple requests, and all I have to do left is to execute each future concurrently that way I can maximize the amount of information that I receive instead of waiting for each future to complete one-by-one. How can I achieve this in a neat, functional way?

(concatenating a vec of Vec's into a single Vec)

It may be better to use fold in this case

https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures/stream/trait.StreamExt.html#method.fold

I decided to switch up the notation a bit, and then tried using fold:

futures::stream::iter(futures0.into_iter())
        .fold(Vec::with_capacity(length as usize), |mut acc, fut: impl Future<Output=Result<Vec<u8>, QuantumError>>| async {
            fut.await.and_then(|vec| Ok(acc.extend(vec)))
        })
type mismatch resolving `<impl core::future::future::Future as core::future::future::Future>::Output == std::vec::Vec<u8>`
   --> quantum_random\src\web_async.rs:253:10
    |
253 |         .fold(Vec::with_capacity(length as usize), |mut acc, fut| async {
    |          ^^^^ expected enum `std::result::Result`, found struct `std::vec::Vec`
    |
    = note: expected type `std::result::Result<(), util::QuantumError>`
               found type `std::vec::Vec<u8>

I'm a bit confused as to how to combine this into a singular vector. Am i using fold correctly?

You need to handle the results correctly, also extend does not return a new vec

try this

futures::stream::iter(futures0.into_iter())
        .fold(Ok(Vec::with_capacity(length as usize)), |mut acc, fut: impl Future<Output=Result<Vec<u8>, QuantumError>>| async {
            let vec = fut.await?;
            acc?.extend(vec);
            Ok(acc)
        })
1 Like

This compiles and runs:

    Ok(futures::stream::iter(futures0.into_iter())
        .fold(Vec::with_capacity(length as usize), |mut acc, fut| async {
            let res = fut.await.and_then(|res| Ok(res));
            match res {
                Ok(vec) => {
                    acc.extend(vec);
                    acc
                },
                Err(_) => Vec::with_capacity(0)
            }
        }).await)

My question is: Can i propagate an error from the fold? I have to return an empty vec, which is not known as an error until a bounds-check is done downstream in the logic.

Also: Is this necessarily parallel? I run the program, and I get one request after another in a suspicious way. It seems like one request comes only after the previous one, and the temporal deltas are about the same.