Rayon sequential dependency between parallel calculations

Rayon's parallel iterators work great when each calculation is independent from every other calculation. For example, we can multiply each number in an array by two and then sum() the products, where summation is an example of an associative operation we can use with reduce_with().

let data: Vec<i32> = vec![1, 2, 3, 4];
let result: i32 = data.par_iter().map(|x| x * 2).sum();
assert!(result == 1*2 + 2*2 + 3*2 + 4*2);

This gets to be a little trickier if we need some sequential dependency between calculations. Suppose we want to multiply each number by two, then take the difference with the previous product, and then sum the results? This is sort of operation is very common, for example, when computing a derivative or convolving with some kernel.

Solution: There is no way to express this kind of sequential dependency in Rayon as of now. There is a feature request to convert parallel iterators to sequential iterators without collect() that would make this possible, but it dates back to 2017 and won't likely me merged any time soon.

If your first operation is distributive (scalar multiplication is) and you have a slice of the data you want to work on already in memory:

let data: Vec<i32> = vec![1, 2, 3, 4];
let result: i32 = data.par_windows(2) // requires a slice
    .map(|v| 2 * (v[1] - v[0]))
    .sum();

Or if you don't have the slice in memory you can use a sequential iterator like Itertools::tuple_windows() to feed a parallel iterator:

let data: Vec<i32> = vec![1, 2, 3, 4];
let data = data.into_iter(); // don't need all data in memory at once
let result: i32 = data.tuple_windows::<(_,_)>()
    .par_bridge()
    .map(|(x, y)| 2 * (y - x))
    .sum();

If you can't distribute the first operation or you don't have all the data in memory then it's about trade-offs. If you're memory constrained then be prepared to do the first op serially or redundantly in parallel:

// Serial scalar multiplication, then parallel reduction.
data.tuple_windows::<(_,_)>().map(|x| x*2).par_bridge().map(|(x,y)| y-x).sum();
// Everything's parallel, but multiplication done twice.
data.tuple_windows::<(_,_)>().par_bridge().map(|x,y| (2*x,2*y)).map(|(x,y)| y-x).sum();

If the first op is computationally expensive and you're not memory constrained then collect() partway through like this:

let data: Vec<i32> = data.par_iter().map(|x| x*2).collect();
let result: i32 = data.par_windows(2).map(|v| 2 v[1] - v[0]).sum();

See also https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504.

Thanks to @dthul for pointing out ParallelSlice::par_windows() and @drewkett for pointing out Itertools::tuple_windows(), which make these concise workarounds possible. The rest of my original post continues below for posterity.

The obvious (to me) solution is to perform the first operation (multiplication) in parallel, then collect the results, and then perform the dependent (difference) operation in parallel:

let data: Vec<i32> = vec![1, 2, 3, 4];
let products: Vec<i32> = data.par_iter().map(|x| x * 2).collect();
let result: i32 = products.iter().zip(products.iter().skip(1)).par_bridge().map(|(x,y)| y - x).sum();
assert!(result == (2*2 - 1*2) + (3*2 - 2*2) + (4*2 - 3*2));

However, imagine the data are very long and actually come from a lazily-evaluated iterator instead of a vector. Collecting them into the intermediate products vector could cost a lot of memory! Is there a more efficient/idiomatic way to achieve this sort of non-associative reduction, or to deal with sequential dependencies between parallel calculations in general?

For this specific situation, you could try something like this:

let result: i32 = [1, 2, 3, 4]
    .par_windows(2)
    .map(|v| 2 * (v[1] - v[0]))
    .sum();

@dthul, thanks for pointing out ParallelSlice::par_windows(). That's a lot more succinct than using zip() in my original example. However, it still only works when all the data are in memory at once such as in a vector, which can be sliced. This could be problematic if the data are very large (e.g. gigabytes). I suspect what you have shown is the best solution right now, but I'm still curious if there is a way to do this where the data come from a more memory-efficient consuming iterator.

let data: Vec<i32> = vec![1, 2, 3, 4];
let data = data.into_iter(); // consuming iterator
let result: i32 = data.par_windows(2) // won't work, data isn't a slice
    .map(|v| 2 * (v[1] - v[0]))
    .sum();

You should be able to use tuple_windows from itertools to generate an iterator with pairs and then convert that to a parallel iterator for reduction.

Maybe I misunderstand but that particular computation, if we go to the very long limit, isn't it completely memory bound so that there isn't much to win by (rayon) parallelization? Using simd might have the best results - or parallelizing the generation, not this arithmetic.

Thank you @drewkett, the tuple_windows() combinator does indeed work on consuming iterators, but not consuming parallel iterators. Imagine instead of multiplying by two we did some very expensive computation, which we would like to speed up by doing it in parallel, followed by a non-associative reduction (here computing the difference), which we would also like to do in parallel. With your example of tuple_windows() we have to do the multiplication an extra time for each window. If only there were a way to go from a parallel iterator to an ordinary iterator without collect()ing the results... but it looks like this feature doesn't exist yet.

@bluss, I don't think this is memory bound, maybe I/O bound if the data is streaming from a TCP socket or from the filesystem... So long as the data itself comes from some consuming iterator then we need roughly at least 2 * size_of<i32>() * n memory, where n is the number of threads, and maybe a little more than this for plumbing. But we should be able to avoid using data.len() * size_of<i32>() of memory all at once.