Help for my parallel sum

Hi all,

I'm writing my next dev.to article but I can get out of my lifetime issues :frowning:

My goal is to use threads to calculate the sum of a vector using threads:

use std::iter::Sum;
use std::marker::Send;
use std::ops::Range;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::ops::Add;
use std::ops::AddAssign;
use std::fmt::Debug;

pub trait Chunkable {
    // splits a Range into a vector of ranges at least of length chunk_size
    fn chunks(&self, chunk_size: usize) -> Vec<Range<usize>>;

    // divide a range into a vector of a maximum of n sub-ranges
    fn split_into(&self, n: usize) -> Vec<Range<usize>>;
}

impl Chunkable for Range<usize> {
    fn chunks(&self, chunk_size: usize) -> Vec<Range<usize>> {
        // create a vector from range containing all ranges
        // as the collect will contain refs, need to clone
        let tmp: Vec<_> = self.clone().collect();

        // empty vector will contain our ranges
        let mut v = Vec::new();

        // as we have now a vector, we can use the chunks() slice method
        // chunks will contain a vector a vectors
        let chunks = tmp.chunks(chunk_size);

        // convert each individual chunk into a range
        for chunk in chunks {
            println!("{:?}", chunk);
            v.push(chunk[0]..*chunk.last().unwrap() + 1);
        }

        v
    }

    fn split_into(&self, n: usize) -> Vec<Range<usize>> {
        self.chunks(self.len() / n)
    }
}

pub trait ParallelSum<T> {
    fn parallel_sum<'a>(&self, nb_threads: usize) -> T
    where
        T: 'a + Sum<&'a T> + Send + Sync + Default + AddAssign;
}

impl<T> ParallelSum<T> for Arc<Vec<T>>  {
    fn parallel_sum<'a>(&self, nb_threads: usize) -> T
    where
        T: 'a + Sum<&'a T> + Send + Sync + Default + AddAssign,
    {
        // create a range with the number of elements of the vector
        let range = 0..self.len();

        // create a partition from that range
        let chunks = range.chunks(nb_threads);

        // create the channel to be able to receive partial sums from threads
        let (sender, receiver): (Sender<T>, Receiver<T>) = mpsc::channel();

        // this vec will hold thread handles spawend from this main thread
        let mut threads = Vec::new();

        // need to create an Arced version of the vector
        //let arced = Arc::new(self);

        // create threads: each thread will get the partial sum
        for chunk in chunks {
            // increment ref counter
            let arced_cloned = self.clone();

            // each thread gets its invidual sender
            let thread_sender = sender.clone();

            // spawn thread
            let child = thread::spawn(move || {
                // calculate partial sum
                let partial_sum: T = arced_cloned[chunk].iter().sum();

                // send it through channel
                thread_sender.send(partial_sum).unwrap();
            });

            // save thread ID
            threads.push(child);
        }

        // get results from threads
        let mut total_sum = T::default();

        for _ in 0..threads.len() {
            // main thread receives the partial sum from threads
            let partial_sum = receiver.recv().unwrap();

            //println!("Received partial sum = {:?}", partial_sum);

            // and get the final total sum
            total_sum += partial_sum
        }

        total_sum
    }
}

but I get another error I don't understand:

error[E0310]: the parameter type `T` may not live long enough
  --> src/main.rs:83:25
   |
54 | impl<T> ParallelSum<T> for Arc<Vec<T>>  {
   |      - help: consider adding an explicit lifetime bound `T: 'static`...
...
83 |             let child = thread::spawn(move || {
   |                         ^^^^^^^^^^^^^
   |
note: ...so that the type `[closure@src/main.rs:83:39: 89:14 arced_cloned:std::sync::Arc<std::vec::Vec<T>>, chunk:std::ops::Range<usize>, thread_sender:std::sync::mpsc::Sender<T>]` will meet its required lifetime bounds
  --> src/main.rs:83:25
   |
83 |             let child = thread::spawn(move || {
   |                         ^^^^^^^^^^^^^

error: aborting due to previous error

I'm lost. Any help appreciated !

Just follow the instructions

help: consider adding an explicit lifetime bound T: 'static...

This is happening becaue T could contain a reference, and if it is then it must outlive 'static for it to be safe to send to another thread. This is because 'static means that the value will live as long as it needs to, and another thread could outlive the main thread!

You could put that for loop inside a crossbeam::scope and use its spawn to work with non-'static lifetimes. Or for a turnkey solution, Rayon also implements sum(), and makes it easy to use custom fold+reduce in general.

3 Likes

Year but I wanted to experiment by myself without Rayon. This is really painful...

I'm a little bit reluctant to use external crates. It's hidding language complexity and prevents you to learn it. Really painful for such a simple task: parallel summation.

Dont' get me wrong; I love Rust. But so many hurdles to clear for such simple tasks. I can't imagine when more complex stuff is on the table.

It's only simple in other languages because either they don't bother making sure your objects live long enough, or they have a garbage collector such that all references act like shared owners in this respect.

The standard library did have a thread::scoped API before Rust 1.0, but it was found to be unsound -- see this post describing the Leakpocalypse. So now we're in a state where safer alternatives were developed in external crates like crossbeam and rayon, and since the ecosystem generally encourages the use of crates, there hasn't been a push to bring this back into std.

5 Likes

I can understand your point.

My fear is that Rust will become an over-skilled expert language only used by a niche set of programmers. Why is Python sucessful? Because of its elegant and straightforward simplicity, and its richness in terms of standard library. And its large set of external libs. Of course if more than 20 years old...

Use of external crates is OK for me. But my point is for such so mainstream features like XML, http, etc, it should be in the std language like in Go or Java. I hope it's only a matter of workforce, not a voluntary policy.

But it's only my point and I'm not a proselyte.

Even for a seasoned programmer and architect like me, touching Rust threads is difficult... but I'm learning :wink:

BTW, how to get rid of my errors? I'm stuck now.

1 Like

Note that Python's large standard library is not always such a good thing -- see this recent criticism, "Batteries Included, But They're Leaking".

I took the liberty of chopping this down to what I think is more idiomatic overall: playground

use std::iter::Sum;
use std::sync::mpsc;

pub trait ParallelSum<T> {
    fn parallel_sum(&self, nb_threads: usize) -> T
    where
        T: for<'a> Sum<&'a T> + Sum<T> + Send + Sync;
}

impl<T> ParallelSum<T> for [T] {
    fn parallel_sum(&self, nb_threads: usize) -> T
    where
        T: for<'a> Sum<&'a T> + Sum<T> + Send + Sync,
    {
        // figure out the right size for the number of threads, rounded up
        let chunk_size = (self.len() + nb_threads - 1) / nb_threads;

        // create the channel to be able to receive partial sums from threads
        let (sender, receiver) = mpsc::channel::<T>();

        crossbeam::scope(|scope| {
            // create threads: each thread will get the partial sum
            for chunk in self.chunks(chunk_size) {
                // each thread gets its invidual sender
                let thread_sender = sender.clone();

                // spawn thread
                scope.spawn(move |_| {
                    // calculate partial sum
                    let partial_sum: T = chunk.iter().sum();

                    // send it through channel
                    thread_sender.send(partial_sum).unwrap();
                });
            }

            // drop our remaining sender, so the receiver won't wait for it
            drop(sender);

            // sum the results from all threads
            receiver.iter().sum()
        })
        .unwrap()
    }
}

fn main() {
    let vec: Vec<i32> = (0..1000).collect();
    println!("sum: {}", vec.iter().sum::<i32>());
    println!("parallel_sum(2): {}", vec.parallel_sum(2));
    println!("parallel_sum(4): {}", vec.parallel_sum(4));
}
6 Likes

Thanks a lot, I'll take a look tomorrow as it's late here now.

But you're using an external crate :wink:

In Rust, std primarily contains functionality that requires special support in the compiler. Functionality that does not require special support typically is developed in other crates.

Because of Rust's guarantees of backward compatibility it is very difficult to improve functionality in std, whereas functionality in other crates can be improved through semver without breaking Rust's guarantees. That is one reason why Rust is not a "batteries included" language, because it's almost impossible to replace defective batteries if they are in std.

12 Likes

@TomP Thanks for your explanation.

So does this mean that what I try do achieve is impossible without resorting to an external crate or duplicating what this external crate is providing?

Let me play the devil's advocate. Suppose I'm a manager of some software company very interested in what Rust is providing. I'm learning that Rust ecosystem and find out sometimes lots of external crates are necessary for a simple task (e.g. regexes). I will soon ask: "who's supporting those crates? If a crate is not supported or developed anymore, how to get support and from whom?". These are legitimate questions any software manager will ask, as well as skilled people availability.

Once again, this is a constructive criticism and I love Rust. My opinion is that you can't publicly advertise for safety features for which you necessarily need to download external packages. C++ and D standard library is large, 24V batteries included. But for sure older than Rust is.

1 Like

Well, since you're mentioning C++, note that its standard library has historically been very lean too. Many high-level features like regexes are C++11 additions which only landed in mainstream compilers a couple years ago, and "classic" standard library features of other languages like XML and JSON support (or even TCP/IP sockets !) are still absent from the STL.

As surprising as it may sound today, old C++ standard revisions did not even have threads or hashmaps in their standard libraries, and yet they were nonetheless extremely successful in use cases ranging from operating system development to video games and high-frequency trading.

The same can be said of C, whose standard library is even smaller than C++98's was. Many of the "libc" functions that are typically considered to belong to the standard C library are actually Posixisms or Linuxisms which are not supported on all operating systems, and may be quite badly emulated on some (asynchronous disk I/O on Linux being a classic example).

D never really achieved very broad industry adoption as far as I know, so I'm not sure if its community ever tackled this problem.


I think the way the C/++ situation historically worked out is that either the maintainers of external libraries or third-party companies like Linux distribution maintainers offered support contracts for heavily used external libraries (and open-source C/++ compilers, for that matter). A similar strategy could probably be successful for Rust.

But overall, it seems to me that by the standard of other bare-metal-friendly programming languages (native binaries & no GC), Rust's standard library isn't unusually thin. If anything, it is a bit thicker than the norm, though the existence of #[no_std] ensures that this doesn't cause too many portability problems in embedded environments.

7 Likes

Why don't you ask it for stdlib too? Surely because you trust the core team. But many of those "external libraries" are supported or even maintained by the core team too! If you want some lost of them, rust-lang-nursery is a place for libraries with extra care from them. But it's not a complete list of it, so always check the contributors to know if it's supported by trusted people.

2 Likes

For sure I trust the core team. I'm ok for downloading crates for specific needs, but my opinion is that basic and common (like regex) features should be in std. Regex is standard is all modern languages including Kotlin native. The Rust regex crate is depending on at least 5 other crates, not counting recursive dependencies.

I wholeheartedly hope that Rust will succeed. I'm just wondering if it will. Being a favorite's dev language doesn't make necessarily a success. It should be accepted and used by the industry.

As for C+, several boost libs are now in the std. But C++ has a long history and background.

In other languages useful stuff is pulled into std to be easy to use, because dependencies are a pain there. In Rust, writing use regex is as easy as use std::regex.

For Rust's success being lean is also important. People already complain that "Hello World" is 700KB. It'd be worse if it was 700KB + regex library + kitchen sink.

2 Likes

@kornel I don't think that dependency is specially painful in Java, D, Kotlin...

Anyway we can discuss hours but I don' know how I can change my mind because the more I use Rust, the more I'm convinced. This is just my opinion. Downloading tons of crates to achieve a simple task is not my vision of simplicity specially when it comes to use an external crate to fulfill a simple requirement :wink:

Coming back to my problem, the most I can achieve without an external crate is:

fn parallel_sum<T>(v: Vec<T>, nb_threads: usize) -> T
where
    T: 'static + Send + Sync + Debug + AddAssign + Default + Copy,
{
    // this vector will hold created threads
    let mut threads = Vec::new();

    // need to arced the vector to share it
    let arced = Arc::new(v);

    // this channel will be use to send values (partial sums) for threads
    let (sender, receiver) = mpsc::channel::<T>();

    // create requested number of threads
    for thread_number in 0..nb_threads {
        // increment ref count, will be moved into the thread
        let arced_cloned = arced.clone();

        // each thread gets its invidual sender
        let thread_sender = sender.clone();

        // create thread and save ID for future join
        let child = thread::spawn(move || {
            // initialize partial sum
            let mut partial_sum: T = T::default();

            // this line doesn't compile:
            // partial_sum = arced_cloned.into_iter().sum();

            // trivial old style loop
            for i in 0..arced_cloned.len() {
                //
                if i % nb_threads == thread_number {
                    partial_sum += *arced_cloned.get(i).unwrap();
                }
            }

            // send our result to main thread
            thread_sender.send(partial_sum).unwrap();
            
            // print out partial sum
            println!(
                "thread #{}, partial_sum of modulo {:?} = {:?}",
                thread_number, thread_number, partial_sum
            );
        });
        
        // save thread ID
        threads.push(child);
    }
    
    // wait for children threads to finish
    for child in threads {
        let _ = child.join();
    }
    
    // terminate sender and get final sum
    //drop(sender);

    let mut total_sum = T::default();
    for _ in 0..nb_threads {
        // main thread receives the partial sum from threads
        let partial_sum = receiver.recv().unwrap();

        // and get the final total sum
        total_sum += partial_sum
    }
    
    total_sum
}

Probably sup-optimal but compiling and working. I found it incredibly painful for sharing an immutable value, not even touching mutation.

Thanks to @cuviper and this thread Why does thread::spawn need static lifetime for generic bounds? - #2 by lambda for a better understanding.

May not be painful, but with Rust it is painless. In the vast majority of cases you just stick a line in Cargo.toml and you're done. Just use the crate as if it were another module to your crate (in the 2018 edition).

1 Like

There's no need to use channels.

use std::sync::Arc;
use std::thread::spawn;
use std::cmp::min;
use std::ops::AddAssign;

fn main() {
    let mut vec = Vec::with_capacity(1000000);
    for i in 0..1000000u64 {
        vec.push(i*i);
    }
    let sum: u64 = vec.iter().cloned().sum();
    println!("local:    {}", sum);
    println!("parallel: {}", parallel_sum(vec, 11));
}


fn parallel_sum<T>(v: Vec<T>, nb_threads: usize) -> T
where
    T: 'static + Send + Sync + AddAssign + Default + Copy,
{
    if nb_threads == 0 { panic!("At least one thread required."); }
    if nb_threads > v.len() { panic!("More threads than items in vector."); }
    if v.len() == 0 { return T::default(); }

    // divide round up
    let items_per_thread = (v.len()-1) / nb_threads + 1;

    let arc = Arc::new(v);
    let mut threads = Vec::with_capacity(nb_threads);

    for i in 0..nb_threads {
        let data = arc.clone();
        let thread = spawn(move || {
            let from = i * items_per_thread;
            let to = min(from + items_per_thread, data.len());

            let mut sum = T::default();
            for v in &data[from..to] {
                sum += *v;
            }
            sum
        });
        threads.push(thread);
    }
    let mut sum = T::default();
    for t in threads {
        sum += t.join().expect("panic in worker thread");
    }
    sum
}
4 Likes

@alice Thanks for the tip :wink:

It was also an educational example to use channels. I'll probably keep it in my article, but anyway good to know this method.

Channels are a prime example why std is better off to be small.

The std channels are slower and less flexible than the crossbeam crate. The std channel has even a rare crasher bug that has no fix. There was a plan to either replace std implementation with crossbeam's (but not fully, since old naming and limitations have to stay for back compatibility) or just totally deprecate them and tell people to install crossbeam.

4 Likes

Yes, ::std represents a version 1.x.y that will never be able to reach 2.0 since it needs to remain backwards compatible forever. Having an ecosystem based on such a library just to save a few lines in a configuration file is not be the best choice for the long run.

2 Likes