Rust simple job queue

  1. Here is what I have so far:

#[derive(Copy, Clone)]
pub struct Arg {}


pub fn work(arg: Arg) {
    // ...
}

pub fn run_seq(lst: &Vec<Arg>) {
    for t in lst {
        work(*t);
    }
}

pub fn run_par(lst: &Vec<Arg>) {
    for t in lst {
        let arg = *t;
        thread::spawn(move || {
            work(arg);
        });
    }
}

pub fn run_par_24(lst: &Vec<Arg>) {
    1. we want to start 24 threads

    2. eadch thread does:

        while global_queue_not_empty() {
            arg = pop_item() ;; // avoid race condition here w/ check above
            work(arg);
        }
}
  1. I have implemented a "seq" (single thread runs everything), "par" (one thread for each task). What I need help with is implementing a "24_worker".

  2. So the idea is that we start 24 workers. Each work then, if possible, pops an item off the global queue, runs it, and repeats until the global queue is empty.

I find crossbeam channels useful for this kind of thing:

  • Start an appropriate number of worker threads (eg 24)
  • each thread iterates on items from the channel rx; when the iterator returns None, the iterator and the thread closure exit.
  • Feed items into the channel.
  • Drop the sender when all work has been queued, which will disconnect the channel and cause the receivers to return None (rather than block) when the channel is empty.
  • Put a bound on the channel if you need to control the depth of the queue (ie, backpressure on the insertion side, which will block if the bound is full).

snippets from a little line-processing program:

    use crossbeam_channel::bounded;

    let (s, r) = bounded::<String>(conf.nthreads * 4);

    for _tid in 0..conf.nthreads {
        let recv = r.clone();
        thread::spawn(move || {
            for line in recv.iter() {
              // work
            }
        }
    }

    let reader = BufReader::new(io::stdin());
    for line in reader.lines() {
        s.send(line?).unwrap();
    }

The only thing not shown here is termination; you need to either capture the thread::spawn() values and join() them later, or (as in my case) have some other way of knowing when the work is finished.

1 Like

Is your goal is to experiment in this area and write your own, or just get some job done in parallel? If the latter, then you can use Rayon's .par_iter() on the Vec.

1 Like

I have

#[derive(Copy, Clone)]
pub struct Arg {}

pub fn work(arg: Arg) {
    // ...
}

pub fn run_seq(lst: &Vec<Arg>) {
    for t in lst {
        work(*t);
    }
}

pub fn run_par(lst: &Vec<Arg>) {
    for t in lst {
        let arg = *t;
        thread::spawn(move || {
            work(arg);
        });
    }
}

pub fn run_par_24(lst: &Vec<Arg>) {
    let (main_sender, main_receiver) = mpsc::channel();
    let mut threads = Vec::new();

    for i in 0..24 {
        let main_sender = main_sender.clone();
        let tid = thread::spawn(move || {
            let (thread_sender, thread_receiver) = mpsc::channel();
            main_sender.send(thread_sender).unwrap();
            while let Some(arg) = thread_receiver.recv().unwrap() {
                work(arg);
                main_sender.send(thread_sender).unwrap();
            }
        });
        threads.push(tid);
    }

    let mut i = 0;
    let n = lst.len();

    while i < n {
        let arg = lst[i];
        let to_thread = main_receiver.recv().unwrap();
        to_thread.send(Some(arg).unwrap();
        i = i + 1;
    }

    for i in 0..24 {
        let to_thread = main_receiver.recv().unwrap();
        to_thread.send(None);
    }

    for t in threads.iter() { t.join(); }
}
```

so far, but it's so messy / complicated that I have no faith in it's ocrrectness.

It's also rather brittle in that the main thread is sending ou5t 24 None's to ensure that the child threads finish, which then ensures that the join's finish.

@kornel : Originally, I was going to write my own. Having done so, I think I'm going ot just use an existing well tested library.

Do we need to reorder the s.send and thread::spawn to avoid the potential race condition of:

  1. all child threads spawn
  2. all child threads die
  3. everything gets enqueued, but nothing runs as all the child threads die

@kornel

My work function has side effects (IO). Reading the Rayon readme:

For the most, parallel iterators in particular are guaranteed to produce the same results as their sequential counterparts. One caveat: If your iterator has side effects (for example, sending methods to other threads through a Rust channel or writing to disk), those side effects may occur in a different order. Note also that, in some cases, parallel iterators offer alternative versions of the sequential iterator methods that can have higher performance.

In the case of side effects, does Rayon guarantee each iter / map-function is executed precisely once ?

No. The child threads block on the iterator when it's empty but still connected. The channel (queue) can drain and refill as many times as it likes.

A channel becomes disconnected when (all clones of) either the tx or rx handles are dropped; in other words:

  • if the tx was dropped, no new items can be added. In this case, the rx will still produce items until the channel is also empty, and then return None. This lets the queue drain and then the threads exit gracefully.
  • if the rx was dropped, no items added can ever be removed. In this case, the tx.send() will error.

If you did reorder them, and used a bounded channel, your main thread would probably block in s.send() on a full channel before even starting the workers. You could, of course, start the sender in a thread as well, then it doesn't matter which order you put them.

The race condition we do have to take care of, though, is the program main thread exiting before all the work is done. As I alluded to, in the example above, after loading all the input lines into the channel, you will need to know how to wait for the work to be done. That means either blocking the main thread on join() of each of the workers, or having some other way. In my case, there are more threads than shown here, and I can join on just one of them.

Edit footnote: I was sure I'd written about this recently too: Test for when crossbeam_channel is "disconnected"?

1 Like

Rayon does guarantee it'll be executed precisely once. Nothing can guarantee it'll be executed in any particular order, because that would defeat the parallelism.

If you need things to happen in order, you'll need to separate them out, and first perform parallelizable (order-independent) bits, collect to an ordered array (Rayon does guarantee .collect() collects elements in their original order), and run order-dependent side effects serially.

2 Likes

Thanks for the clarification.

I don't need any ordering guarantees. I only need a "precisely once" guarantee.

@dcarosone : Thanks for the detailed explaination. Everything is clear now:

My previous (incorrect) understanding

if bounded is empty:
  recv.iter() returns Option::None

What is truly happening:

  recv.iter() blocks until either (1) it gets an item or (2) the bounded channel is closed

  the bounded channel is NOT closed until the "sender" to the channel is dropped

spot on

FYI that's available as try_iter() for non-blocking use

1 Like