I have an iterator who's values I would like to process:
- Concurrently.
- Lazily. Only calling
iterator.next()
when there's a thread ready to process item. - Cancelable. Easy to cancel processing before iterator is exhausted.
My current code looks like this:
pub fn process_items_with_threads<I>(iterator: I) -> Receiver<String>
where
I: Iterator<Item = String> + Send + 'static,
{
let (tx, rx) = mpsc::channel();
let items = Arc::new(Mutex::new(iterator));
for _ in 0..num_cpus::get() {
let items = items.clone();
let tx = tx.clone();
thread::spawn(move || loop {
let item = match items.lock().unwrap().next() {
Some(item) => item,
None => return,
};
let processed_item = format!("Processed {}", item);
if tx.send(processed_item).is_err() {
return;
}
});
}
rx
}
Items are processed concurrently. Iterator .next() is only called when a thread needs work. I can cancel the whole thing by dropping the returned Receiver. Yeah!
I'm now trying to implement the same thing in rayon
.
This is what I have so far:
pub fn process_items_with_rayon<I>(iterator: I) -> Receiver<String>
where
I: Iterator<Item = String> + Send + 'static,
{
let items = Arc::new(Mutex::new(iterator));
let (tx, rx) = mpsc::channel();
fn process_next_item<I>(items: Arc<Mutex<I>>, results: Sender<String>)
where
I: Iterator<Item = String> + Send + 'static,
{
let item = match items.lock().unwrap().next() {
Some(item) => item,
None => return,
};
let processed_item = format!("Processed {}", item);
if results.send(processed_item).is_ok() {
rayon::spawn(move || process_next_item(items, results));
}
}
for _ in 0..num_cpus::get() {
let items = items.clone();
let tx = tx.clone();
rayon::spawn(move || {
process_next_item(items, tx);
});
}
rx
}
Sanity check... am I on the right path here?
Generally the rayon version seems to be working. It's a little slower, but nice because it's using a thread pool, so if process_items is called in quick succession it won't create a ton of threads like the original version does.
But the rayon version is more complex and I'm not really sure that I'm using rayon to its full advantage. Is there another cleaner way (or different crate) to do what I'm trying to do?