Feedback: Ch. 16, Multi-threaded, operate on non-overlapping slices of vector

I've finished Chapter 16 of The Rust Programming Language. I wrote a small program to practice what I've learned so far. This program creates N threads for N chunks of a vector. (I believe this is a common practice problem, not sure though). Each thread performs the same summation operation on its chunk. After the threads are finished, the main thread sums together all of the threads results and prints the total sum. The comments have more information. Please let me know your thoughts. Thank you.

/// Operate on non-overlapping slices of a vector.
/// Have multiple threads operate on non-overlapping slices of the same vector
/// at the same time. Since each thread does not need exclusive access to the
/// vector, a Mutex is not used.
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;

use crate::shared_chunk::SharedChunkIterator;

/// A shared chunk is a non-overlapping slice of an `Arc<Vec<T>>`.
mod shared_chunk {
    use std::ops::{Deref, Range};
    use std::sync::Arc;

    /// A `SharedChunk` represents a slice of an `Arc<Vec<T>>` that can be moved between threads.
    ///
    /// This struct is created by the `SharedChunkIterator`.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::sync::Arc;
    /// let shared_vec = Arc::new(vec![1, 2, 3]);
    /// let shared_chunk_iter = SharedChunkIterator::new(Arc::clone(&shared_vec), 2);
    /// for chunk in shared_chunk_iter {
    ///   println!("chunk sum: {}", chunk.iter().sum());
    /// }
    /// ```
    #[derive(Debug)]
    pub struct SharedChunk<T> {
        v: Arc<Vec<T>>,
        range: Range<usize>,
    }

    impl<T> SharedChunk<T> {
        fn new(v: Arc<Vec<T>>, range: Range<usize>) -> Self {
            Self { v, range }
        }
    }

    impl<T> Deref for SharedChunk<T> {
        type Target = [T];

        fn deref(&self) -> &Self::Target {
            &self.v[self.range.clone()]
        }
    }

    /// An iterator that produces `SharedChunk`s.
    ///
    ///
    ///
    /// # Examples
    /// ```
    /// use std::sync::Arc;
    /// let shared_vec = Arc::new(vec![1,2,3]);
    /// let shared_chunk_iter = SharedChunkIterator::new(Arc::clone(&shared_vec), 2);
    /// assert_eq!(shared_chunk_iter.next().unwrap(), &[1, 2]);
    /// assert_eq!(shared_chunk_iter.next().unwrap(), &[3]);
    /// assert_eq!(shared_chunk_iter.next(), None);
    /// ```
    pub struct SharedChunkIterator<T> {
        shared_vec: Arc<Vec<T>>,
        chunk_size: usize,
        chunk_count: usize,
    }

    impl<T> SharedChunkIterator<T> {
        pub fn new(shared_vec: Arc<Vec<T>>, chunk_size: usize) -> Self {
            SharedChunkIterator {
                shared_vec,
                chunk_size,
                chunk_count: 0,
            }
        }
    }

    impl<T> Iterator for SharedChunkIterator<T> {
        type Item = SharedChunk<T>;

        fn next(&mut self) -> Option<Self::Item> {
            let start = self.chunk_count * self.chunk_size;
            let end = self.shared_vec.len().min(start + self.chunk_size);

            if start < end {
                self.chunk_count += 1;
                Some(SharedChunk::new(Arc::clone(&self.shared_vec), start..end))
            } else {
                None
            }
        }
    }
}

fn main() {
    let nums = vec![
        1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1, 11.1, 12.1, 13.1, 14.1, 15.1, 16.1,
        17.1, 18.1, 19.1, 20.1,
    ];

    let shared_nums = Arc::new(nums);

    let mut handles: Vec<JoinHandle<f64>> = vec![];

    const NUM_THREADS: usize = 5;
    let chunk_size = (shared_nums.len() as f64 / NUM_THREADS as f64).ceil() as usize;
    println!("chunk_size {}", chunk_size);

    // I use `SharedChunkIterator` to produce chunks that can be moved into the closure.
    // The iterator would create new chunks with clones of the shared_nums.
    // shared_chunks_iter.next() -> SharedChunk
    let shared_chunk_iter = SharedChunkIterator::new(Arc::clone(&shared_nums), chunk_size);
    for chunk in shared_chunk_iter {
        // Inside the thread, treat `chunk` as a slice since a `SharedChunk` dereferences to a slice
        let handle = thread::spawn(move || chunk.iter().sum());

        handles.push(handle);
    }

    let mut sum = 0.0;
    for handle in handles {
        let r = handle.join().unwrap();
        println!("{}", r);
        sum += r;
    }

    let actual_sum: f64 = Arc::clone(&shared_nums).iter().sum();
    debug_assert!((sum - actual_sum).abs() > f64::EPSILON);
    println!("sum = {}", sum);
}

Isn't the inequality supposed to point in the other direction? Surely you want to assert that the difference is smaller than some epsilon, no?

You don't need Arc::clone here.

Thank you @H2CO3.

Yes, the inequality is supposed to point in the other direction. AND this brings up another issue: it's not a correct way to compare floats. When I corrected the inequality sign, the assertion fails. Comparing floats is a whole other issue, so I won't focus on it too much for now.

Regarding Arc::clone, got it. Thanks again.

Indeed it does, but this is actually expected.

Floating-point operations are not associative; you are not guaranteed to stay within 1ULP if you perform many additions grouped differently; furthermore, your sum is not in the interval [1, 2) so you can't expect the absolute difference to be comparable to or less than the machine epsilon.

You should multiply the allowed absolute difference by at least NUM_THREADS * max(sum, actual_sum); as expected, this actually passes the assertion. This is because every chunk can introduce ~1 ULP of error individually, and the absolute magnitude of 1 ULP is directly proportional to the number under scrutiny, so in the worst case, it's approximately EPSILON * max(all numbers involved).

3 Likes