How to wait all spawn jobs using rayon?

I have a struct which has a function to consume itself:

struct Foo {}

impl Foo {
    fn consume(self) {}
}

and a function to handle Foos:

fn bar(foos: Vec<Foo>) {
    foos.into_par_iter().for_each(|foo| foo.consume())
}

And now, the API changed, my bar function can only take a mutable reference of vector for now:

fn bar(foos: &mut Vec<Foo>)

Which leads to that I can't use into_par_iter anymore (let's say I don't want to do a copy), so I write like this:

fn bar(foos: &mut Vec<Foo>) {
    while let Some(foo) = foos.pop() {
        rayon::spawn(move || {
            foo.consume();
        })
    }
}

But this solution has one problem: the function will return when jobs are "submitted", not the time they are finished.

So I have to add a condition var to notify when job is done:

fn bar(foos: &mut Vec<Foo>) {
    let len = foos.len();
    let condvar1 = Arc::new((Mutex::new(0), Condvar::new()));
    while let Some(foo) = foos.pop() {
        let condvar2 = condvar1.clone();
        rayon::spawn(move || {
            foo.consume();
            let (lock, cvar) = &*condvar2;
            let finished = lock.lock().unwrap();
            *finished += 1;
            cvar.notify_one();
        })
    }
    let (lock, cvar) = &*condvar1;
    let mut finished = lock.lock().unwrap();
    while *finished < len {
        finished = cvar.wait(finished).unwrap();
    }
}

The code are way too complicated compared with the origin one, so I wondered can rayon handle this complexity for me? I found a function called join, but it seems only take two parameters.

Then why don't you cheat?

fn bar(foos: &mut Vec<Foo>) {
    mem::take(foos).into_par_iter().for_each(|foo| foo.consume());
}

or

fn bar(foos: &mut Vec<Foo>) {
    foos.par_drain(..).for_each(|foo| foo.consume());
}

Alternatively, it looks like what you need is exactly a Scope:

fn bar(foos: &mut Vec<Foo>) {
    rayon::scope(|scope| {
        while let Some(foo) = foos.pop() {
            scope.spawn(move |_| {
                foo.consume();
            });
        }
    });
}

although this is more verbose; good old synchronous drain() somewhat helps here, too:

fn bar(foos: &mut Vec<Foo>) {
    rayon::scope(|scope| {
        foos.drain(..).for_each(|foo| {
            scope.spawn(move |_| {
                foo.consume();
            });
        });
    });
}
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.