Strange memory behavior with rayon

Hi,
I have a little piece of code that causes a huge mem use which isn't freed later:

rayon::scope(|scope| {
            for (tree1, record1) in records {
                for (tree2, record2) in records {
                    for fun in TWO_ARG_FUNCTIONS.iter() {
                        let thread_outcomes = outcomes.clone();
                        scope.spawn(move |_| {
                            let guesses = (fun.fun)(record1.clone(), record2);
                            if guesses.iter().find(|&v| !v.is_finite()).is_none() {
                                let predictions = sort_guesses(guesses, thread_outcomes);
                            }
                        });
                    }
                }
            }
        });

Without call to sort_guesses there is no high mem use. However the same code without rayon:

//        rayon::scope(|scope| {
            for (tree1, record1) in records {
                for (tree2, record2) in records {
                    for fun in TWO_ARG_FUNCTIONS.iter() {
                        let thread_outcomes = outcomes.clone();
//                        scope.spawn(move |_| {
                            let guesses = (fun.fun)(record1.clone(), record2);
                            if guesses.iter().find(|&v| !v.is_finite()).is_none() {
                                let predictions = sort_guesses(guesses, thread_outcomes);
                            }
//                        });
                    }
                }
            }
//        });

runs fine, i.e. no high mem use. There is no unsafe in my code.

The function sort_guesses is rather simple:

pub fn sort_guesses(guesses: Vec<f64>, outcomes: Vec<&Prediction>) -> Vec<(f64, &Prediction)> {
    let mut predictions = guesses.into_iter().zip(outcomes).collect::<Vec<(f64, &Prediction)>>();
    predictions.sort_unstable_by(|(first, _), (second, _)| first.partial_cmp(second).unwrap());
    predictions
}

What is extra surprising is that even when I add drop(predictions) right after sort_guesses call, memory used is still increasing with each invocation (this code is run in a loop).
Valgrind shows about 120kB still reachable and 2kB possibly lost at the end of my program. However mem use goes by about 2GB with every loop.

Does anyone have any suggestion what to try next?
Thanks

edit: formatting

That call makes the closure capture thread_outcomes = outcomes.clone(), which I'm guessing is the main memory user. When you spawn a bunch of these, the closures are all boxed and added to the scope's queue, so you'll have many of these outcomes alive at the same time.

Instead of cloning that, can you just share it? It appears that it is defined outside the scope, which means your spawns can just borrow it directly -- pass something like outcomes: &[Prediction] to your sort_guesses.

I would also suggest using parallel iterators, rather than a nest of for-loop spawns. Something like records.par_iter().for_each(|(tree1, record1)| { ...etc... }), and this should have much better behavior in dynamically splitting across threads.

In the serial case, you'll clone the outcomes, use it, then drop it before the next iteration.

Unfortunately not, because sort_guesses consumes it (in order to put them in order of guesses)

But shouldn't this all be freed when rayon::scope returns?

Thanks for your help.

At least in what you showed here, it doesn't really need to consume a Vec. That short snippet could be rewritten like this:

pub fn sort_guesses(guesses: Vec<f64>, outcomes: &[Prediction]) -> Vec<(f64, &Prediction)> {
    let mut predictions = guesses.into_iter().zip(outcomes).collect::<Vec<(f64, &Prediction)>>();
    predictions.sort_unstable_by(|(first, _), (second, _)| first.partial_cmp(second).unwrap());
    predictions
}

That's assuming you can provide this as a slice of direct Prediction values. If you need that indirection, then something like this should work:

pub fn sort_guesses<'a>(guesses: Vec<f64>, outcomes: &[&'a Prediction]) -> Vec<(f64, &'a Prediction)> {
    let mut predictions = guesses.into_iter()
        .zip(outcomes.iter().copied())
        .collect::<Vec<(f64, &Prediction)>>();
    predictions.sort_unstable_by(|(first, _), (second, _)| first.partial_cmp(second).unwrap());
    predictions
}

Yes, but now you get into how to define "memory usage", because your allocator probably won't actually return that memory to the operating system right away. So that will still be counted against your process virtual size, and probably even your resident size.

That "free" memory should get reused the next time around though, so if you're repeating this with similar-sized work many times, I would expect that to reach a steady-state maximum. If not, maybe you've triggered some pathological behavior in the allocator, but it's hard to tell.

If you can't change sort_guesses at all, another option is to delay your clone inside the spawn:

    scope.spawn(move |_| {
        let guesses = (fun.fun)(record1.clone(), record2);
        if guesses.iter().find(|&v| !v.is_finite()).is_none() {
            let thread_outcomes = outcomes.clone();
            let predictions = sort_guesses(guesses, thread_outcomes);
        }
    });

Now the spawn closure will just capture a reference to the original outcomes.

Brilliant, thanks! Silly me, I somehow missed that .zip() returns &Predictions.

That's how it looks: each loop adds about 10-15 % of available mem. Then goes to swap, then process gets killed by OS.
I will take a look into telling allocator to give up all memory.

With your change to sort_guesses I was able to use par_iter() instead of spawn() and memory usage is back to normal.
Thanks for your help.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.