Async parallel download and method chaining

I've been struggling to make this store_result got executed in this method chaining:


    
    let (tasks, files): (Vec<_>, Vec<_>) = result_files
        .into_iter()
        .filter(|x| should_download_file(&provider_id_path, x.filename.clone()))
        .map(download_file)
        .unzip();

    let results = future::join_all(tasks).await;

    let store_result = |x: (Result<Response, Error>, Result<File, std::io::Error>)| async {
        //info!("Storing result: {:?}, {:?}", x.0, x.1);

        // make sure both are Oks
        let (result, file) = match (x.0, x.1) {
            (Ok(result), Ok(file)) => {
                info!("Got result and file.");
                info!("Result: {:?}", result.url());
                info!("File: {:?}", file);
                (result, file)
            }
            _ => {
                error!("Failed to get result and file.");
                return;
            }
        };
        // Write bytes to file
        let _ = write_bytes_to_file(file, result).await;
    };

    // Map result and file to store_result
    let _ = results
        .into_iter()
        .zip(files)
        .map(|x| async {
          debug!("Storing result: {:?}, {:?}", x.0, x.1);
            let _ = store_result(x).await;
        })
        .collect::<Vec<_>>();

    info!("Batch download done.");

The future::join_all(tasks).await; got executed correctly. What's wrong with the code?

This piece of code is not doing what you were expecting it to do. Futures in Rust are lazy and won't be polled unless you await them.

1 Like

Do you mean I need to wait for the collected let _ variable?

No, you can only await futures. What I mean is that each of the futures within that collected vector isn't doing anything since it's not being polled. What you need to do is the following:

// This is a Vec of futures
let store_results_futures = results
        .into_iter()
        .zip(files)
        .map(|x| async {
          debug!("Storing result: {:?}, {:?}", x.0, x.1);
            let _ = store_result(x).await;
        })
        .collect::<Vec<_>>();

  // You need to pass those futures to your executor
  let _ = future::join_all(store_results_futures).await;
1 Like

Ahh, feels like dejavu. I'll try this out.

Works like a charm. Thanks!

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.