I need to run an IO-bound task for each item in a large vector (58000 elements). Here's my main function so far:
[::tokio::main]
async fn main() {
let database = Database::open();
let s3client = S3Client::new().await;
let photos = database.select();
let len = photos.len();
for (i, photo) in photos.iter().enumerate() {
println!("[{}/{} ({}%)] {} ...", i, len, ((i as f32 / len as f32) * 100.0).round() as i32, photo.url);
fetch_and_assign_exif_data(&photo, &s3client, &database).await;
}
}
I want to improve performances by processing multiple items at a time using a worker pool. After some research and prototyping, a suggested solution was to use rayon, but the blending with async has been less than pleasant. I also tested futures, but it wasn't a success either. What's the best approach here?
Spawning async tasks in the loop is probably the best first approximation.
[::tokio::main]
async fn main() {
let database = Database::open();
let s3client = S3Client::new().await;
let photos = database.select();
let len = photos.len();
for (i, photo) in photos.iter().enumerate() {
tokio::task::spawn(async {
println!("[{}/{} ({}%)] {} ...", i, len, ((i as f32 / len as f32) * 100.0).round() as i32, photo.url);
fetch_and_assign_exif_data(&photo, &s3client, &database).await;
});
}
}
I didn’t try it (on iPad at the moment) but something like this is a good starting point. You may wish to limit concurrency with a JoinSet
or LocalSet
.
2 Likes
Another way to write a similar thing would be to use the futures
crate:
[::tokio::main]
async fn main() {
let database = Database::open();
let s3client = S3Client::new().await;
let photos = database.select();
let len = photos.len();
photos
.iter()
.enumerate()
.map(|(i, photo)| async move {
println!("[{}/{} ({}%)] {} ...", i, len, ((i as f32 / len as f32) * 100.0).round() as i32, photo.url);
fetch_and_assign_exif_data(&photo, &s3client, &database).await;
})
.collect::<FuturesUnordered>()
.await;
}
or using streams:
[::tokio::main]
async fn main() {
let database = Database::open();
let s3client = S3Client::new().await;
let photos = database.select();
let len = photos.len();
futures::stream::iter(&photos)
.enumerate()
.then(|(i, photo)| async move {
println!("[{}/{} ({}%)] {} ...", i, len, ((i as f32 / len as f32) * 100.0).round() as i32, photo.url);
fetch_and_assign_exif_data(&photo, &s3client, &database).await;
})
.collect::<Vec<()>>()
.await;
}