Sanity check: Code to concurrently process items from iterator in rayon

I have an iterator who's values I would like to process:

  1. Concurrently.
  2. Lazily. Only calling iterator.next() when there's a thread ready to process item.
  3. Cancelable. Easy to cancel processing before iterator is exhausted.

My current code looks like this:

pub fn process_items_with_threads<I>(iterator: I) -> Receiver<String>
where
    I: Iterator<Item = String> + Send + 'static,
{
    let (tx, rx) = mpsc::channel();
    let items = Arc::new(Mutex::new(iterator));
    for _ in 0..num_cpus::get() {
        let items = items.clone();
        let tx = tx.clone();
        thread::spawn(move || loop {
            let item = match items.lock().unwrap().next() {
                Some(item) => item,
                None => return,
            };

            let processed_item = format!("Processed {}", item);

            if tx.send(processed_item).is_err() {
                return;
            }
        });
    }
    rx
}

Items are processed concurrently. Iterator .next() is only called when a thread needs work. I can cancel the whole thing by dropping the returned Receiver. Yeah!


I'm now trying to implement the same thing in rayon.

This is what I have so far:

pub fn process_items_with_rayon<I>(iterator: I) -> Receiver<String>
where
    I: Iterator<Item = String> + Send + 'static,
{
    let items = Arc::new(Mutex::new(iterator));
    let (tx, rx) = mpsc::channel();

    fn process_next_item<I>(items: Arc<Mutex<I>>, results: Sender<String>)
    where
        I: Iterator<Item = String> + Send + 'static,
    {
        let item = match items.lock().unwrap().next() {
            Some(item) => item,
            None => return,
        };

        let processed_item = format!("Processed {}", item);

        if results.send(processed_item).is_ok() {
            rayon::spawn(move || process_next_item(items, results));
        }
    }

    for _ in 0..num_cpus::get() {
        let items = items.clone();
        let tx = tx.clone();
        rayon::spawn(move || {
            process_next_item(items, tx);
        });
    }

    rx
}

Sanity check... am I on the right path here?

Generally the rayon version seems to be working. It's a little slower, but nice because it's using a thread pool, so if process_items is called in quick succession it won't create a ton of threads like the original version does.

But the rayon version is more complex and I'm not really sure that I'm using rayon to its full advantage. Is there another cleaner way (or different crate) to do what I'm trying to do?

I assume you're not just using par_iter() because it doesn't meet some of your criteria? (maybe not cancelable, I'm unsure).

If that's the problem, you might consider adding cancellation into the iterator chain, something like .filter(|| keep_going == true).fuse().into_par_iter(...)

As for alternate approaches and crates, with crossbeam you could have some workers started as threads. Each of these reads items from channel and writes back to another. The iterator is fed to the input channel, which can be bounded with depth 0 to enforce the lazy iteration. The threads exit if the output channel is disconnected, which you do by dropping the receiver as in your original code.

This probably gets rid of all your Arc<Mutex<>> as well, unless that's necessary for other reasons.

Rayon also has ParallelBridge for this use case.

2 Likes

Er, yeah, that's what I was thinking of when I said into_par_iter(), sorry.

@cuviper @dcarosone Thanks for the pointer to ParallelBridge, I didn't know about and it seems to work well.

But I'm still not sure how to stream those processed results to a channel so that I can start receiving results before they have all been processed. And also so that I can cancel processing by dropping that channel receiver.

Right now there is a collect_into_vector method... is something possible like a collect_into_sender method? Might work like this?

let (tx, rx) = channel();
iterator
    .par_bridge()
    .map(|each| format!("Processed {}", each))
    .collect_into_sender(tx);

// able to get some value right away before all values are processed
rx.recv();

// drop receiver and rayon stops processing values
drop(rx)

You can use .try_for_each_with(tx, |tx, item| tx.send(item)), and wrap it all in spawn to run asynchronously so you can receive items as they're processed. Use scope+spawn if it needs to borrow local data, putting the rx.recv() in the scope too, because the scope won't end until all spawns are done.

1 Like

Also, keep in mind that the input side of par_bridge() is serialized, so I'd suggest keeping that as minimal as you can. Put your computational map/filter/etc. on the parallel side as much as possible.

Wow, that's awesome and does exactly what I want. Thanks!

And it was all right there in the docs, even with channels as example use case :frowning: . I hadn't come across the try pattern for iterators yet and so those methods didn't stand out to me. And after not being able to do what I wanted with the default map I got side tracked trying to work with rayon at a lower level.

For anyone following along here's a much shorter way to do what I was trying to do in original post:

pub fn process_items<I>(iterator: I) -> Receiver<String>
where
    I: Iterator<Item = String> + Send + 'static,
{
    let (tx, rx) = mpsc::channel();
    rayon::spawn(|| {
        let _ = iterator
            .par_bridge()
            .try_for_each_with(tx, |tx, item| tx.send(format!("Processed {}", item)));
    });
    rx
}
4 Likes