Use rayon's parallel iterator on any iterator without collecting first

I've got a process like this:

find_items_to_process_iter()
    // .into_par_iter() // not implemented in rayon :(
    // .collect().into_par_iter() // bad, waits too long
    .map(process_item)
    .collect::<Result<_>>()?

The find_items_to_process_iter is I/O-bound and takes long time to complete. The process_item is CPU-bound. Ideally I'd like items to be processed, in parallel, as soon as they're found, while searching is still on.

I guess I need some kind of thread pool? But if I just chuck processing to a pool, then how do I collect the results and check for errors?

1 Like

You can try using futures_cpupool - Rust, which gives you a Future for each submitted task. You can then feed that to futures::stream::futures_unordered - Rust, and work with the results as a Stream, handling errors in the process.

I read that Tokio just landed support for blocking tasks (like disk I/O) in their latest release, maybe that could be useful to you?

1 Like

I dunno if this helps your usecase (sounds like it could fit) but I thought it was a cool idea no one seemed to like :rofl:: lazy_transducer - Rust

Good luck!

You could use the Rayon threadpool like this (playground):

let (tx, rx) = mpsc::channel();

rayon::scope(move |scope| {
    for item in find_items_to_process_iter() {
        let tx = tx.clone();
        scope.spawn(move |_| {
            tx.send(process_item(item));
        });
    }
});

rx.into_iter().collect::<Result<Vec<_>,_>>()?

Note that the order of the results will be nondeterministic. If you want the results in order, you could .enumerate() the input iterator, and include the indices in the results so they can be sorted after collection.

6 Likes

The latest version of rayon has a built-in feature for this:

4 Likes