How to Run Asynchronous Jobs in Worker Pool

I need to run an IO-bound task for each item in a large vector (58000 elements). Here's my main function so far:

[::tokio::main]
async fn main() {
	let database = Database::open();
	let s3client = S3Client::new().await;
	let photos = database.select();
	let len = photos.len();

	for (i, photo) in photos.iter().enumerate() {
		println!("[{}/{} ({}%)] {} ...", i, len, ((i as f32 / len as f32) * 100.0).round() as i32, photo.url);
		fetch_and_assign_exif_data(&photo, &s3client, &database).await;
	}
}

I want to improve performances by processing multiple items at a time using a worker pool. After some research and prototyping, a suggested solution was to use rayon, but the blending with async has been less than pleasant. I also tested futures, but it wasn't a success either. What's the best approach here?

Spawning async tasks in the loop is probably the best first approximation.

[::tokio::main]
async fn main() {
    let database = Database::open();
    let s3client = S3Client::new().await;
    let photos = database.select();
    let len = photos.len();

    for (i, photo) in photos.iter().enumerate() {
        tokio::task::spawn(async {
            println!("[{}/{} ({}%)] {} ...", i, len, ((i as f32 / len as f32) * 100.0).round() as i32, photo.url);
            fetch_and_assign_exif_data(&photo, &s3client, &database).await;
        });
    }
}

I didn’t try it (on iPad at the moment) but something like this is a good starting point. You may wish to limit concurrency with a JoinSet or LocalSet.

2 Likes

Another way to write a similar thing would be to use the futures crate:

[::tokio::main]
async fn main() {
    let database = Database::open();
    let s3client = S3Client::new().await;
    let photos = database.select();
    let len = photos.len();

    photos
        .iter()
        .enumerate()
        .map(|(i, photo)| async move { 
            println!("[{}/{} ({}%)] {} ...", i, len, ((i as f32 / len as f32) * 100.0).round() as i32, photo.url);
            fetch_and_assign_exif_data(&photo, &s3client, &database).await;
        })
        .collect::<FuturesUnordered>()
        .await;
}

or using streams:

[::tokio::main]
async fn main() {
    let database = Database::open();
    let s3client = S3Client::new().await;
    let photos = database.select();
    let len = photos.len();

    futures::stream::iter(&photos)
        .enumerate()
        .then(|(i, photo)| async move { 
            println!("[{}/{} ({}%)] {} ...", i, len, ((i as f32 / len as f32) * 100.0).round() as i32, photo.url);
            fetch_and_assign_exif_data(&photo, &s3client, &database).await;
        })
        .collect::<Vec<()>>()
        .await;
}