Several requests in parallel with Rust

I'm trying to learn async rust by trying to do a bunch of requests at once but what I've got only works sequentially. My goal is to print the UrlStatus as it goes and do as many of them at the same time as possible.

use reqwest;
use serde::Serialize;

#[derive(Serialize)]
struct UrlStatus<'a> {
    url: &'a str,
    status: Option<u16>,
}

async fn check(url : &str) {
    match reqwest::get(url).await {
        Ok(response) => {
            let status = UrlStatus {
                url,
                status: Some(response.status().as_u16()),
            };
            println!("{}", serde_json::to_string(&status).unwrap());
        }
        Err(_) => {
            let error = UrlStatus {
                url,
                status: None,
            };
            eprintln!("{}", serde_json::to_string(&error).unwrap());
        }
    }
}


#[tokio::main]
async fn main() {
    let urls = vec!["https://httpbin.org/status/200"; 10];

    for url in urls {
        let handle = tokio::spawn(check(url));
        handle.await.unwrap();
    }
}

After a bit of googling, I encountered https://stackoverflow.com/a/51047786 and ended up with:

use reqwest;
use futures::StreamExt;
use serde::Serialize;

#[derive(Serialize)]
struct UrlStatus<'a> {
    url: &'a str,
    status: Option<u16>,
}



#[tokio::main]
async fn main() {
    let urls = vec!["https://httpbin.org/status/200"; 10];

    let client = reqwest::Client::new();

    let bodies = futures::stream::iter(urls)
        .map(|url| {
            let client = client.clone();
            tokio::spawn(async move {
                match client.get(url).send().await {
                    Ok(response) => {
                        let status = UrlStatus { url, status: Some(response.status().as_u16()) };
                        println!("{}", serde_json::to_string(&status).unwrap());

                    }
                    Err(_) => {
                        let status = UrlStatus { url, status: None };
                        eprintln!("{}", serde_json::to_string(&status).unwrap());
                    },
                }
            })
        })
        .buffer_unordered(10); // Automatically decide the best value for this?

    bodies.for_each(|_| async {}).await; // is for_each(|_| async {}) necessary?
}

This seems to work but there are some parts (I've added comments) that I don't understand.
I'm open to any feedback on improving my code in any way. Thank you.

If that's your only goal, the first code snippet is (almost) good enough. The biggest issue there is the handle.await.unwrap() in the for url in urls: it will prevent the next tokio::spawn from starting up until the handle.await is done. Collect and join all of them at once instead:

// `cargo add futures`
use futures::future::join_all;

async fn main() {
    let urls = vec!["https://httpbin.org/status/200"; 10];
    let mut checks = Vec::with_capacity(10);
    for url in urls {
        let handle = tokio::spawn(check(url));
        checks.push(handle);
    }
    join_all(checks).await;
1 Like

In the loop you're spawning a new async task and then immediately wait for it to finish by awaiting the join handle. You would need to memorize the join handle in a vec or something, and in a second loop await all handles in that vec.

Edit: Never mind @00100011 was faster than me :grinning_face:

2 Likes

For now, yes! I do plan to expand on this as I figure things out.

Your example worked wonderfully, thank you!