Concurrency mystery

Below is some Rust code that solves ProjectEuler #165 in a mult-threaded fashion. The code works (I appreciate any and all improvement suggestions, of course). Notice that thread 0 does more work than thread 1 (which does more work than 2...), but (here's the mystery) all threads end at nearly the same time. In other words if I try to balance the work by changing the start and end values passed to spawn_find_intersections, thread 4 runs much longer than the rest of the threads and the overall run time goes up. I simply don't understand why this is happening.

extern crate num;
extern crate crossbeam;

use num::rational::{Rational64};
use std::fmt;
use std::collections::HashSet;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::env;
use crossbeam::scope;

// psuedorandom number generator
pub struct BlumBlumShub {
    s: u32,
    t: u32,
}

impl Default for BlumBlumShub {
    fn default() -> BlumBlumShub { 
        BlumBlumShub { s: 290797, t: 290797 % 500 }
    }
}

impl Iterator for BlumBlumShub {
    type Item = u32;
    fn next(&mut self) -> Option<(u32)> {
        self.s = ((self.s as u64 * self.s as u64) % 50515093) as u32;
        self.t = self.s % 500;
        Some(self.t)
    }
}

#[derive(Hash, Eq, PartialEq, Clone)]
pub struct Point {
    x: Rational64,
    y: Rational64,
}

impl Point {
    fn new(xx: Rational64, yy: Rational64) -> Point {
        Point {
            x: xx,
            y: yy,
        }
    }
    
    pub fn vector_x_prod(&self, other: &Point) -> Rational64 {
        &self.x * &other.y - &self.y * &other.x
    }
}

impl fmt::Display for Point {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "({}, {})", self.x, self.y)
    }
}

#[derive(Clone)]
pub struct Line {
    p1: Point,
    p2: Point,
}

impl Line {
    fn new(pp1: Point, pp2: Point) -> Line {
        Line {
            p1: pp1,
            p2: pp2,
        }
    }
}

impl fmt::Display for Line {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{}-{}", self.p1, self.p2)
    }
}

pub fn makelines(n: usize) -> Vec {
    let mut bbs: BlumBlumShub = Default::default();
    let mut lines: Vec = Vec::with_capacity(n);
    for _ in 0..n {
        let x1 = bbs.next().unwrap() as i64;
        let y1 = bbs.next().unwrap() as i64;
        let dx = bbs.next().unwrap() as i64 - x1;
        let dy = bbs.next().unwrap() as i64 - y1;
        lines.push(Line::new(Point::new(Rational64::new(x1, 1), Rational64::new(y1, 1)),
          Point::new(Rational64::new(dx, 1), Rational64::new(dy, 1))));
    }
    lines
}

// Work is the type returned by a worker thread, indicating either an intersection point is returned (Item) or the worker is done (Sentinel)
enum Work {
    Item(Point),
    Sentinel(usize),
}

// worker function to find intersection points in the passed vector and return them
// note that the function is parameterized on a scope allowing lines to be shared with the calling function
fn spawn_find_intersections<'a>(sc: &crossbeam::Scope<'a>, lines: &'a Vec, tx: Sender, start: usize, end: usize, n_thread: usize) {  
    sc.spawn(move || {
        let br_zero = Rational64::new(0, 1);
        let br_one = Rational64::new(1, 1);
        
        for (it, l1) in lines[start..end].iter().enumerate() {
            for l2 in lines[(it+1)..lines.len()].iter() {
                let rxs = l1.p2.x * l2.p2.y - l1.p2.y * l2.p2.x; // inline version of l1.p2.vector_x_prod(&l2.p2);
                if rxs != br_zero {
                    let p = Point::new(&l2.p1.x - &l1.p1.x, &l2.p1.y - &l1.p1.y);
                    let t = p.vector_x_prod(&Point::new(&l2.p2.x / &rxs, &l2.p2.y / &rxs));
                    if t > br_zero && t < br_one {
                        let u = p.vector_x_prod(&Point::new(&l1.p2.x / &rxs, &l1.p2.y / &rxs));
                        if u > br_zero && u < br_one {
                            tx.send(Work::Item(Point::new(&l1.p1.x + &t * &l1.p2.x, &l1.p1.y + &t * &l1.p2.y))).unwrap();
                        }
                    }
                }
            }
        }
        tx.send(Work::Sentinel(n_thread)).unwrap();
    });
}

fn main() {
    const LIM: usize = 5000;
    let mut n_threads: usize = 5;
    if let Some(arg1) = env::args().nth(1) {
        // A parameter was passed
        if let Ok(val) = arg1.parse() {
            n_threads = val;
        }
    }
    let seg_size: usize = LIM / n_threads;

    let (tx, rx): (Sender, Receiver) = mpsc::channel(); 
    let lines: Vec = makelines(LIM);
    let mut unique_intersections: HashSet = HashSet::new();

    // Why does this division of labor result in balanced thread work?
    // Use scope to allow lines to be "borrowed" by threads.  If scope wasn't used, we would need to pass lines.clone() below
    crossbeam::scope(|scope| {
        for i in 0..n_threads-1 {  
            spawn_find_intersections(scope, &lines, tx.clone(), i * seg_size, (i + 1) * seg_size, i);
        }
        spawn_find_intersections(scope, &lines, tx.clone(), (n_threads-1) * seg_size, LIM, n_threads-1);

        let mut sent = 0;
        while sent < n_threads {
            let result = rx.recv();
            match result {
                Ok(Work::Item(x)) => { unique_intersections.insert(x); () } // return empty tuple to be consistent
                Ok(Work::Sentinel(_)) => sent += 1,
                Err(e) => println!("{}", e),
            }
        }

        println!("{}", unique_intersections.len());
    });   
}

It would be great is someone here could at least confirm the behavior that I'm observing. The code runs in < 5 seconds

thanks!

This program just prints a number after a while - how did you notice the threads were not working equally hard?

Hi, thanks for looking at it. The number printed out is the answer to ProjectEuler problem #165. The number of unique intersection points for the 5000 line segments.

To understand the behavior I used htop, system monitor and I did prints in the code.

On 4 cores, looking at cpu usage during execution, all cores are being used 100% throughout.

Yes, that's what I'm seeing too. BUT notice that thread 0 is doing much more work that thread 4, yet finishes about the same time. I had thought I'd adjust the start and end values to "balance" the work, but it is balanced even with the unbalanced work done by the threads... why?

[quote="jgilray, post:1, topic:3501"]```
for (it, l1) in lines[start..end].iter().enumerate() {
for l2 in lines[(it+1)..lines.len()].iter() {


I'm sure you meant for `l2` to go from `start+1` etc., but that's not what's happening.  The `enumerate()` has no clue about where the slice was taken -- it just counts from zero, so your `it` will effectively count `0..end-start`.  Then your `l2`s are sweeping almost everything regardless of which thread's `l1` segment you're on.

Try `for l2 in &lines[(start+it+1)..]` and you'll get the work imbalance you were expecting.  Note how I also left the end of the range open, which is as good as going to `len()`, and `&lines` lets the loop take advantage of `IntoIterator` for the same effect as calling `iter()` explicitly.
1 Like

Oh! That explains it! The mystery is solved. Thanks so much cuviper!