How to create parallel sequential iterators?


#1

Yep, I think I do need this oxymoron :slight_smile:

I have a sequence of things, which I want to filter by some condition and take the first five matching elements: xs.filter(condition).take(5).collect(). What would be the idiomatic way to make this code parallel? I think I cannot use rayon’s parallel iterators here, because rayon would split sequence evenly, but it may be the case that all interesting elements can be found near the beginning of the sequence. The precise problem I want to solve is http://adventofcode.com/2016/day/5 :slight_smile:


#2

Given that the example has the interesting values separate by millions, I personally wouldn’t try to optimize for them being near the beginning. I suggest batching a manageable range at a time in parallel to completion, and just see how many you get from each batch.


#3

It seems that it’s not even possible to use take after filter in Rayon – in the current implementation, Take needs the underlying iterator to be indexable (as you say, it’s quite hard to come up with optimal filter implementation, as it greatly depends on the density and location of valid values).

In your case, I’d just create a channel, spawn n threads, and let k-th thread check only the numbers of form i×n + k. Give the Sender to each thread and use the Receiver as an iterator! That may be a little racy, you’d need to somehow check for situation when one thread is faster than the other, but that could be done in post-processing, I guess?

The other slight disadvantage is that the worker threads will keep running until they notice the channel is gone (at the next .send()). You could add some kind of probing, but as the task is one-shot, the threads will be killed anyway so it doesn’t really matter.


#4

Thanks @cuviper! Processing in chunks works beautifully! If anyone is interested, here’s the code: https://gist.github.com/matklad/3dc7b75e2d2961e0ab7b18f97a6d6eee


#5

That’s… not such a trivial question I think. I don’t know whether there’s a solution that would require only a couple of lines of code.

What I’d do would be to spawn a bunch of threads and coordinate through channels, like this:

#[macro_use]
extern crate chan;
extern crate num_cpus;
use std::thread;

const N: usize = 5;

fn expensive_function(i: i32) -> bool {
    match i {
        1000...1010 => true,
        _ => false,
    }
}

fn main() {
    let cpus = num_cpus::get_physical();
    let (tx, rx) = chan::sync(cpus);
    let (tx2, rx2) = chan::sync(cpus);
    for _ in 0..cpus {
        let rxp = rx.clone();
        let tx2p = tx2.clone();
        thread::spawn(move || loop {
            let i = rxp.recv().unwrap();
            if i == 0 {
                return;
            }
            if expensive_function(i) {
                tx2p.send(i);
            }
        });
    }
    drop(tx2);
    let mut results = Vec::with_capacity(N + cpus);
    let mut i = 1;
    loop {
        chan_select!{
            tx.send(i) => {
                i+=1;
            },
            rx2.recv() -> i => {
                let i = i.unwrap();
                results.push(i);
                if results.len()>=N{
                    break;
                }
            },
        }
    }
    for _ in 0..cpus {
        tx.send(0);
    }
    while let Some(i) = rx2.recv() {
        results.push(i);
    }
    results.sort();
    results.truncate(N);
    println!("{:?}", results);
}

Note that this only makes sense if the “expensive_function” is indeed quite expensive. Otherwise the overhead of synchronizing through channels would not be worth it.

What the snippet above is doing is create threads (one per core in your machine) owning a receiving and sending channel. The threads process everything they receive with the expensive_function, and only send back whatever passes the test. Meanwhile, the main thread is sending values to the threads and receiving their answers. Once it has received 5 answers (or whatever number you require), it stops sending inputs to the threads and waits to receive the answers of the currently executing threads. It then sorts the received answers and truncates the vector to 5. This way you ensure that you get the first 5 results and not skip any. Note that if you don’t truncate you may or may not get more than 5 results, but the first 5 results will always be included in the vector.