Downloading files concurrently using Rust

(Original code is taken from Downloading 100,000 Files Using Async Rust - Pat Shaughnessy )

I have some code to download images concurrently using Rust, however, I'm not sure how to implement 2 things. The first, rate limiting, to avoid 429s. I tried using std::thread::sleep, however, this does not work as I would expect. (I did not write the original code, and am new to async rust, so I am unsure of the specifics of how and when the code runs.).

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let paths: Vec<String> = read_lines("links.txt")?;
    let fetches = futures::stream::iter(
    paths.into_iter().map(|path| {
        async move {
            let a = path.split('/').collect::<Vec<&str>>();
            let file_name = a.last().unwrap();
            std::thread::sleep(std::time::Duration::from_secs(1));
            match reqwest::get(&path).await {
                Ok(resp) => {
                    if resp.status().as_u16() != 200 {
                        println!("failed to download");
                        println!("{}", path);
                    };
                    match resp.bytes().await {
                        Ok(bytes) => {
                            //println!("RESPONSE: {} bytes from {}", (bytes.len()), path);
                            write(format!("downloads/{}", file_name), bytes).unwrap();
                        }
                        Err(_) => println!("ERROR reading {}", path),
                    }
                }
                Err(_) => println!("ERROR downloading {}", path),
            }
        }
    })
    ).buffer_unordered(200).collect::<Vec<()>>();
    fetches.await;
    Ok(())
}

Secondly, I want to have a list of a list of links which were "rate limited" (returned a 429 status), and print that once all other files are finished downloading. My attempt, and resulting compiler error message are as follows:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut failed: Vec<String> = Vec::new();
    let paths: Vec<String> = read_lines("links.txt")?;
    let fetches = futures::stream::iter(
    paths.into_iter().map(|path| {
        async move {
            let a = path.split('/').collect::<Vec<&str>>();
            let file_name = a.last().unwrap();
            match reqwest::get(&path).await {
                Ok(resp) => {
                    if resp.status().as_u16() != 200 {
                        println!("failed to download {}", path);
                        failed.push(path);
                        return // To avoid downloading the file, which will not contain what we want.
                    };
                    match resp.bytes().await {
                        Ok(bytes) => {
                            //println!("RESPONSE: {} bytes from {}", (bytes.len()), path);
                            write(format!("downloads/{}", file_name), bytes).unwrap();
                        }
                        Err(_) => println!("ERROR reading {}", path),
                    }
                }
                Err(_) => println!("ERROR downloading {}", path),
            }
        }
    })
    ).buffer_unordered(200).collect::<Vec<()>>();
    fetches.await;
    Ok(())
}
error[E0507]: cannot move out of `failed`, a captured variable in an `FnMut` closure
  --> src/main.rs:28:20
   |
24 |       let mut failed: Vec<String> = Vec::new();
   |           ---------- captured outer variable
...
28 |           async move {
   |  ____________________^
29 | |             let a = path.split('/').collect::<Vec<&str>>();
30 | |             let file_name = a.last().unwrap();
31 | |             match reqwest::get(&path).await {
...  |
35 | |                         failed.push(path);
   | |                         ------
   | |                         |
   | |                         move occurs because `failed` has type `Vec<String>`, which does not implement the `Copy` trait
   | |                         move occurs due to use in generator
...  |
47 | |             }
48 | |         }
   | |_________^ move out of `failed` occurs here

How can I do this?

async move {
    std::thread::sleep(std::time::Duration::from_secs(1));
    ...
    write(format!("downloads/{}", file_name), bytes).unwrap();
}

Stay away from blocking functions inside async blocks. They'll cause the thread to be blocked and prevent other async tasks from running.

Tokio has async equivalents for sleep and write:

Also, write(...).unwrap() is a bad habit. Give that Err some respect!

I see. Thank you, I'll make sure to use this from now on.

For failed, my thought is to throw it into a threadsafe container like Arc<Mutex<Vec<_>>>, and get rid of the move keyword. Then you can clone the Arc, lock the mutex, and update the shared vector safely.

There is a tokio::sync::Mutex, but I think you can get away with using a std::sync::Mutex since you won't be holding the lock across await points. Any code locking the mutex will only have it for a few cycles given how fast failed.push(path) is.

Tokio's documentation says:

Contrary to popular belief, it is ok and often preferred to use the ordinary Mutex from the standard library in asynchronous code.

The feature that the async mutex offers over the blocking mutex is the ability to keep it locked across an .await point. This makes the async mutex more expensive than the blocking mutex, so the blocking mutex should be preferred in the cases where it can be used. The primary use case for the async mutex is to provide shared mutable access to IO resources such as a database connection. If the value behind the mutex is just data, it’s usually appropriate to use a blocking mutex such as the one in the standard library or parking_lot .

An alternative for the failed case would be to have the download closure return an Option<String> where None indicates success and Some(path) indicates failure. Then you can collect them into a Vec, filtering out the None’s using filter_map.

1 Like

Could you link to some place I can find information on getting started with these types? I've never used them myself, I'm pretty new to async Rust and not entirely comfortable with Rust in general. Thanks for the the reply though.

I used the tokio sleep, and yet it still seems to have this odd issue. I think the tokio fs write works fine though.

To get your code working with failed: Arc<Mutex<Vec<…>>>, you can create it with Arc::new(Mutex::new(Vec::new())), or just Default::default(). Actually, you might not even need the Arc here, try if just using Mutex<Vec<String>> works for you. To do this, remove the move from that async move and failed.push(path) becomes failed. lock().unwrap().push(path).

More information about these types (Arc and Mutex) is for example in the book. There's first the chapter about smart pointers (multiple pages, see table of contents on the left) covering Rc and RefCell; those are useful to learn about, because they are the non-thread-safe analogs of Arc and Mutex. Those are then covered in a chapter on concurrency.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.