Nested parallelism and Rayon

So a rather complex concurrent application of mine was producing deadlock-like symptoms in parts of the code where I myself wasn't using any locks.

And I think I know why.

Image you have a CPU with 16 cores, and using rayon, you have distributed task A across all 16 cores. But to complete task A it calls on an algorithm B that itself uses par_iter() to complete...

A can only complete when B has completed, but B can't make progress because the threadpool is busy with A.

This must be a a deadlock, no?

@cuviper @kpreid what do you think?

I get that Rayon is LIFO (last in, first out) by default, thus you think B; being fired after A; will complete first, but what if there are no more available cores? Then B just cannot begin.

It's fine, as long as you're not using an external mechanism to block threads. If they're perfectly balanced, the sub-parts of A will each run their B to completion on their current thread. If it's not balanced, then work stealing will spread the load when any thread would go idle.

1 Like

To be concrete: B's parallel work should be using the same thread that called par_iter() to do the computation. That thread is always going to be available to it, so no computation should ever stop simply because the rest of the pool is busy.

If that doesn't work — well, either your code is doing something it shouldn't, or possibly there's a bug in Rayon that needs fixing.

1 Like

I did some experimenting, and I don't think that's how rayon works.

Let's look at this code,

use rayon::{prelude::*, ThreadPoolBuilder};
use std::time::Duration;

fn main() {
    rayon::ThreadPoolBuilder::new()
        .num_threads(1)
        .build_global()
        .unwrap();

    let numbers: Vec<usize> = (0..10)
        .into_par_iter()
        .map(|_: usize| {
            // Make sure we start work on all pooled threads
            std::thread::sleep(Duration::from_millis(100));
            // then try to send some more work to the thread pool and wait for
            // it to finish
            (0..8).into_par_iter().map(|n| n * n).sum()
        })
        .collect();
}

(playground)

In theory, all the thread pool's threads should be busy running the map() tasks after our 100ms sleep.

This code doesn't deadlock, so I'm guessing rayon has some sort of re-entrancy thing where using ParallelIterator methods (e.g. the sum() and second map()) will add the current thread to the list of threads available to do work.

That's what @kpreid means when they say the task's thread will always be available to do work, it just means that there's (probably - depending on timings and stuff) no added parallelism in the nested par_iter().

1 Like

Thanks Michael, and this program is even closer to the concurrency model my app uses, and looks to me correct!

// cargo add tokio --features full
// cargo add tracing tracing_subscriber rayon

use rayon::prelude::*;
use tracing::Instrument;

// sudo apt install stress
static STRESS: bool = true;

fn op(s: u64) {
    let t = std::time::Duration::from_secs(s);
    let child = STRESS.then(|| {
        std::process::Command::new("stress")
            .args(["--cpu", "1"])
            .spawn()
            .unwrap()
    });
    std::thread::sleep(t);

    // curiously, the command appears to stay alive
    // and I have to `pkill stress` once this program returns
    child.map(|mut c| c.kill());
}

#[tracing::instrument]
fn a(n: usize) {
    for n in 0..n {
        // 'a' takes a little time to complete
        op(1);
    }
}

#[tracing::instrument]
fn b(n: usize) {
    (0..n)
        .into_par_iter()
        .map(|n| {
            // 'b' takes a long time to complete
            op(10);
            n * n
        })
        .sum::<usize>();
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt()
        .compact()
        .with_level(false)
        .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE)
        .init();

    let mut tasks = Vec::new();
    let cores = std::thread::available_parallelism().unwrap().get();
    let sem = std::sync::Arc::new(tokio::sync::Semaphore::new(cores));
    let mul = 2;
    let now = std::time::Instant::now();

    for i in 0..cores * mul {
        let permit = sem.clone().acquire_owned().await.unwrap();
        tasks.push(
            tokio::task::spawn_blocking(move || {
                a(1);
                b(1);
                drop(permit);
            })
            .instrument(tracing::info_span!("main", i)),
        );
    }

    for t in tasks {
        t.await;
    }

    println!("program finished in {:?}", now.elapsed());
    println!("expected time to complete: {}s", mul * cores);
}

I just can't explain why it works better in my app when I don't nest my calls to par_iter(), but it just does!

One of the places where removing the par_iters removed the deadlock-like symptoms was where there was a few levels of nested par_iters() and ultimately it was splitting a string over the graphemes and collecting into a Vec of String (1 for each grapheme). I also don't think it was pre allocating the vec with the right capacity. So a LOT of sys calls and allocations.

In single threaded, while overall slow, the CPU load was constant. I am since collecting into a vec which I know will now have a capacity declared upfront, and collecting into SmolStr. I then added only a single call to par_iter() at the right place, and so far this deadlock like situation seems to have disappeared (along with the code being significantly faster).

@cuviper @kpreid @Michael-F-Bryan, do you think heavy allocation across the different threads could cause these kinds of symptoms? Maybe the allocator was getting confused, or the OS swamped requests to park threads, make syscalls etc (note, i am wildly speculating, this is all too low-level for me :smile:)

1 Like

I think that in this kind of situation, speculation like "maybe it's allocation", by you or us, is not going to lead to a true answer.

The way to discover what the problem is is for you to reduce it — take your program and remove parts, until you have something which demonstrates the problem without having any unnecessary parts. Then it will be possible for you or us to look at this program and consider why those necessary parts are troublesome. It might still not be visible to inspection but it will be a better basis for debugging (or a bug report) if it is simpler.

1 Like

It seems what you've discovered is that this is not a deadlock at all, but it sounds like there's a serial chokepoint. That might well be "collecting into a Vec of String", which is a whole lot of allocations as you mention, but also rayon collecting non-indexed iterators into Vec requires a fair amount of data movement to merge things together. But merely guessing will have you spinning your wheels a lot -- this is the time to learn how to use a profiler and see what is actually spending time in those periods.

2 Likes

Interesting. I've never heard the term "serial chokepoint" or even really the concept of a "chokepoint" in computer science. Can you recommend any gentle introductions on this topic?

Also, what is a (non-)indexed iterator?

Also, what is a (non -)indexed iterator?

This is a Rayon-specific concept: an IndexedParallelIterator vs. just a ParallelIterator. Indexed iterators are ones that come from sources like Vec and slices, meaning that it is known exactly how many items they have and they can be efficiently split into independent parts.

I don't think it's a technical term.

Basically, you've formed a chokepoint/bottleneck in your code where only one task can access a resource (the Vec<String> being collected into) at a time. The result is all the "parallel" code effectively running sequentially/serially because each task needs to wait until the previous task is done with the resource before making progress.

This is Amdahl's law in action.

In computer programming, Amdahl's law is that, in a program with parallel processing, a relatively few instructions that have to be performed in sequence will have a limiting factor on program speedup such that adding more processor s may not make the program run faster.

Adding more parallelism in this case would typically make the problem worse because instead of (for example) 8 threads fighting over the Vec<String> lock, you could have 32 threads all fighting for access ("lock contention").

1 Like

could you give an example of one that is not "indexed"?

isn't string.graphemes(true).par_iter().collect::<Vec<String>>(); "indexed"?

is rayon's "indexed" nomenclature equivalent to std's ExactSizeIterator?

Anything from .par_bridge() or .flat_map() isn't indexed. I don't know what else might be; I'd have to scan the traits' list of implementations, same as you could.

1 Like

IndexedParallelIterator is similar to ExactSizeIterator in that it needs to know how many items there are, but it also needs to have a producer that can be split at a specific index. You can see the list of built-in implementors, mostly things built on slices and ranges. Containers like HashMap do know how many items they have, but not where they are, so those parallel iterators are not indexed. Adapters like Map preserve the indexed capability, as it is mapped 1:1, but Filter does not since we can't know ahead of time which items will pass through.

For collecting a Vec, the indexed property is an important optimization, because it means we can pre-allocate the vector and split its memory 1:1 with the input iterator, and write the resulting items directly in place. When the iterator is not indexed, we have to collect separate chunks and then move them into a contiguous vector afterward.

I'm not sure where you're getting a parallel graphemes implementation, but it's probably not indexed. Rayon has a parallel Chars that is not indexed, because utf-8 strings only know their byte length, not the number of chars, and definitely not the position of all the character boundaries. To make it indexable, you would have to collect it into an indexable container first, in which case a further parallel collect is a waste.

2 Likes

I see! Now "indexed" makes much more sense! Thank you!

So, do you think, (typically) anywhere I have par_bridge(), I'd be better served with .collect::<Vec<_>().into_par_iter()?

In principle, that will create an even worse “chokepoint”: instead of having a serial part of the overall data flow, you're actually waiting for the entire Vec to be collected from the first part before starting the second part at all.

In practice, maybe it might be useful. But don't bet on it, and if you do find it improves the program, then there ought to be a different change that's even better.

2 Likes

I'm not fond of par_bridge() in general, as it has some rather clunky drawbacks (like spinning CPU!) that I haven't figured out how to improve yet. It's OK when you really do want "streaming" behavior, fanning out items in parallel as they come in, but if you can collect relatively quickly up-front, then going parallel afterward will probably work better. If you're not sure, try it both ways and see how it goes. :slight_smile:

2 Likes

After some time with the new algorithm, although the situation appears to be much better than before, I'm still noticing periods of time where CPU is at full blast yet progress (in the broader scheme of the algorithm) is atypically slow (compared to other times when the CPU is at full blast) or moments where the CPU dips to nothing for sometimes 1 - 2 minutes, then it all frees up again and jumps back to 100%.

I'm curious about this "serial chokepoint", I'm sure I'm probably engaging in other bad practices that I'm not aware of. Are you aware of any resources, guides or simply best-practices when designing parallel algorithms?

Are there any tools for observability into rayon in order to identify what problems I might be experiencing?

Should I be thinking about FIFO and LIFO and maybe mixing and matching strategically? (am reading rfc0001-scope-scheduling now)

1 Like

This could be par_bridge() spinning CPU, especially if you have multiple bridges running across the pool, because they don't cooperate with each other.

I don't have a guess here, but 1-2 minutes is a pretty wide window to fire up a profiler. If you're on Linux, a simple perf top should get you started to see what it's doing in that time. Other OSes have similar tools, but I'm not familiar enough to make recommendations.

1 Like