What is the correct parallelization approach if you do not want to allocate all parallelized items?

Hi! I'm the author of the pure Rust exr crate, and I'm revisiting my parallelization code. Basically, I have a sequential iterator, each element should be processed in parallel, and the elements should be collected in the end. Simple, right? :smiley:

The Challenge

The reason is this: The file contains blocks of pixels, which are compressed. The blocks should be read from the file sequentially, then decompressed in parallel, and in the end each block is inserted into a pre-allocated image buffer.

Now, it should also be possible to process a very large image (larger than RAM available) block by block, without ever allocating the memory for the whole image at once. This means that there's an additional constraint: No more than X elements should be allocated at once while decompressing each element in parallel. They shall under no circumstances all be be put into a gigantomatic buffer because allocation will fail.

Another special requirement is that the decompression as well as the insertion of blocks into the image may fail and the error must be propagated to the end-user correctly.

What I have tried so far


ParBridge +TryFold

Here's what my first attempt looked like when using rayon (pseudo):

let sequential_compressed_blocks_reader: impl Iterator<Item=Result<CompressedBlock>> = open_file(path); // reading blocks fail due to IO errors

let image: Result<Image> = sequential_compressed_blocks_reader.par_bridge()

    // decompress in parallel
    .map(|compressed_block_or_err| decompress(compressed_block_or_err?)) // returns Result<DecompressedBlock> as decompression may fail
    
     // if this whole chain returns Err at any point, try_fold_with will abort and return the error (nice)
    .try_fold_with(pre_allocated_image, |mut image, decompressed_block_result|{
       image.insert_pixels_or_err(decompressed_block_result?)?; // inserting the block into the image may fail
       Ok(image) // return the updated image
    });

However, there's a slight problem: try_fold_with doesn't actually return the image. It will internally clone the whole preallocated image buffer, and write a few blocks to each clone. try_fold returns these multiple image clones. Then these multiple images buffer clones would have to be reduced into one final image, in an additional complicated call to try_reduce_with. This doesn't seem like a good approach to me, as it involves a ton of allocations, a lot of book keeping to remember which image contains which pixels, and a ton of copy operations just to merge the final images.


TryFold + Send

I have tried to solve this problem folding over a clonable sender instead of folding the image itself:

let sequential_compressed_blocks_reader: impl Iterator<Item=Result<CompressedBlock>> = open_file(path); // reading blocks fail due to IO errors

let (decompressed_block_sender, decompressed_block_receiver) = std::sync::mpsc::sync_channel(12); // no more than 12 pixel blocks in the buffer at once

sequential_compressed_blocks_reader.par_bridge()

    // decompress in parallel
    .map(|compressed_block_or_err| decompress(compressed_block_or_err?)) // returns Result<DecompressedBlock> as decompression may fail
    
     // if this whole chain returns Err at any point, try_fold_with will abort and return the error (nice)
    .try_fold_with(sender, |mut sender, decompressed_block_result|{
        // if decompression fails, return err. if ok, then send the decompressed block to the image
        decompressed_block_result.map(|decompressed_block| decompressed_block_sender.send(decompressed_block).expect("thread error"))?;
        Ok(sender)
    })?;

// try inserting the blocks into the image
for decompressed_block in decompressed_block_receiver {
    pre_allocated_image.insert_pixels_or_err(block)?;
}

This worked, but as I just realized a few days ago, this actually allocates all the blocks! When calling try_fold_with(...)?, the decompression starts in parallel, but then it waits for any errors, so that it can return Ok() in the end. This means that all elements are buffered internally, until the last element is known to be ok. Then, in one go, the for loop at the end inserts all the buffered blocks into the pre-allocated image. Not good.


TryFold + Send + Async Collector Thread

Today, I had another attempt:

let sequential_compressed_blocks_reader: impl Iterator<Item=Result<CompressedBlock>> = open_file(path); // reading blocks fail due to IO errors

let (decompressed_block_sender, decompressed_block_receiver) = std::sync::mpsc::sync_channel(12); // no more than 12 pixel blocks in the buffer at once

// without blocking, receive decompressed blocks and insert into the image (could use async?)
let collected_image_or_insertion_err = thread::spawn(move ||{
    let mut image = pre_allocated_image;

    // try inserting the blocks into the image
    // this iterator will simply finish when the sender terminates (no matter if error or panic)
    for decompressed_block in decompressed_block_receiver {
        image.insert_pixels_or_err(block)?;
    }

    Ok(image)
})

// block the this thread while decompressing in parallel
let ok_or_decompression_error = sequential_compressed_blocks_reader.par_bridge()

    // decompress in parallel
    .map(|compressed_block_or_err| decompress(compressed_block_or_err?)) // returns Result<DecompressedBlock> as decompression may fail
    
     // if this whole chain returns Err at any point, try_fold_with will abort and return the error (nice)
    .try_fold_with(sender, |mut sender, decompressed_block_result|{
        // if decompression fails, return err. if ok, then send the decompressed block to the image
        // if an error happens when the pixels are inserted into the image, sending will fail, this panics, and the real error is lost, right?
        decompressed_block_result.map(|decompressed_block| decompressed_block_sender.send(decompressed_block).expect("cannot send block"))?;
        Ok(sender)
    });


// this will only report errors that happen while reading from file and decompressing the pixels
// if decompressing errors, this returns immediately, the sender is closed, which ends the collector thread
ok_or_decompression_error?;

// if necessary, wait until all decompressed blocks have been inserted 
// will only report errors that happen when pixels are inserted into the pre-allocated image
let collected_image_or_insertion_err = collected_image_or_err.join();
Ok(collected_image_or_insertion_error)

The problem with this is, that the sending may fail and trigger expect("cannot send block") when the receiver terminates - which happens when there is an error while inserting a pixel block into the image. If this happens, the try_fold_with will panic. This means that the original error will be lost and the process is not aborted gracefully.


Question

It seems to me like there should be a simpler way to do all this. Can anyone point me some simple parallelization mechanisms or other approaches to achieve the parallel decompression without excessive allocation? Or maybe there's only a small detail missing in my previous approaches, which can be mad working easily?

Is this a perfect use case for async future Rust?

Thanks for your time :slight_smile:

I would probably roll my own a bit more.

let (send, recv) = std::sync::mpsc::channel();
let mut currently_running = 0;

for chunk in chunks {
    while currently_running >= MAX {
        let answer = recv.recv();
        process_answer(answer);
        currently_running -= 1;
    }

    let send = send.clone();
    currently_running += 1;
    rayon::spawn(move || {
        let answer = compute(chunk);
        send.send(answer);
    });
}

while currently_running > 0 {
    let answer = recv.recv();
    process_answer(answer);
    currently_running -= 1;
}

If you need the chunks in order, you can use a BinaryHeap to store the extras while you still have some holes in your current collection of completed chunks. Of course, you can also use the size of the heap as another condition in the inner while loop to wait if one hole is taking a lot longer than the others.

This still uses rayon, but only for managing the threadpool.

1 Like

Thanks a lot for your input! <3 Managing it "manually" looks easier than I imagined. It might work perfectly.

I think using Futures might be advantageous later on, as I hope it allows for a different architecture (pull based external iteration instead of internal folding). As this seems a little more involved, I'm definitely trying your suggestion first. I'll try to implement it and see if it works for me, and then I'll come back to report on the status.

Futures are mainly meant for IO bound stuff. While could use them for this kind of thing, rayon is the normal tool for this kind of task.

1 Like

Ok thanks :slight_smile:

Well I got into trouble when trying to implement your suggestion in the real code base.

I forgot to mention that the spawned threads must borrow some outer variable immutably. To avoid this situation in the future, here is the complete raw code I'm trying to implement:

        let meta_data: MetaData = self.meta_data().clone(); // cloning once is okay

        let (send, recv) = std::sync::mpsc::channel(); // TODO crossbeam?
        let mut currently_running = 0;
        let mut image = initial;

        let pool = rayon::ThreadPoolBuilder::new().build().expect("thread error");

        for chunk in chunks {
            while currently_running >= 12 {
                let answer = recv.recv().expect("thread error")?;
                image = insert_block(image, answer)?;
                currently_running -= 1;
            }

            let send = send.clone();
            currently_running += 1;

            pool.spawn(move || {
                let answer = UncompressedBlock::decompress_chunk(chunk, &meta_data, pedantic);
                send.send(answer);
            });
        }

        while currently_running > 0 {
            let answer = recv.recv().expect("thread error")?;
            image = insert_block(image, answer)?;
            currently_running -= 1;
        }

        Ok(image)

How can I borrow local variables? With pool.scope()? Will I have to clone the meta data (expensive)?

The issue with sharing data between threads is that they must have a 'static lifetime.

There are two approaches to this:

  • Store the data in an Arc, cloning this Arc for every thread in the pool (this is cheap). This is probably the simplest.
  • Use an abstraction called "scoped thread", that ensures the spawned thread ends before the referenced variables are dropped. Rayon (as well as other crates) provides such a thing.

I tried the scoped thread pool but I didn't understand it at first. Now I will attempt this:


        let meta_data: &MetaData = self.meta_data();
        let chunks = self.into_iter();

        rayon::scope(move |scope|{
            let meta_data = meta_data; // borrow the meta data from above

            let (send, recv) = std::sync::mpsc::channel(); // TODO crossbeam?
            let mut currently_running = 0;
            let mut image = initial;

            for chunk in chunks {
                while currently_running >= 12 {
                    let answer = recv.recv().expect("thread error")?;
                    image = insert_block(image, answer)?;
                    currently_running -= 1;
                }

                let send = send.clone();
                currently_running += 1;

                scope.spawn(move |_scope| {
                    let answer = chunk.and_then(|chunk| UncompressedBlock::decompress_chunk(chunk, meta_data, pedantic));
                    send.send(answer);
                });
            }

            while currently_running > 0 {
                let answer = recv.recv().expect("thread error")?;
                image = insert_block(image, answer)?;
                currently_running -= 1;
            }

            Ok(image)
        })

Unfortunately, this means that now everything needs to be sendable... well okay

That's table stakes -- the very purpose of Send is making sure that your types are safe to send to other threads, which is a basic necessity for parallelism. In some cases, you might be able to break that down to a sendable "seed" to create non-sendable data local to each thread. Either way, let the type system do its job so the compiler protects you! :slight_smile:

Where exactly is the error coming from?

This all sounds too complicated. The basic challenge seems to be such an awkwardly parallel problem.

        let meta_clone = self.meta_data().clone();

        self
            .par_bridge()
            .map(move |compressed_chunk| UncompressedBlock::decompress_chunk(compressed_chunk?, &meta_clone, pedantic))
            .collect::<Result<Vec<_>>>()?.into_iter()   // FIXME DO NOT allocate all at once
            .try_fold(image, insert_block_into_image_or_err)

All I want to do is to get rid of this temporary collect statement, is there really no easy way in rayon to do this?! Why is it so complicated to not allocate all the elements in an iterator (which is the point of iterators in the first place)

That's table stakes [..] Either way, let the type system do its job so the compiler protect you!

The above code, which can handle borrowing and fail-fast error handling without requiring anything to be sendable, is working... But removing the huge allocation suddenly requires everything to be sendable? Why should this be necessary?

Edit: Wait! par_bridge does require the iterator to be Send! Please forgive me my inadequate frustrated rant

I mean, it would be a pretty major problem if rayon didn't require send.

Maybe I'm way off (I couldn't get something close to your code snippets running on the playground in reasonable time), but couldn't you just leave out the collect line and have

.try_fold(image, |r| insert_block_into_image_or_err(r?))

as the last line? It's not exactly the same functionality, but "inspecting all elements for an error before stuffing them into image" might indeed require an allocation, where "stuffing elements into image until you meet an error" might not.

I tried that, but unfortunately, the fold operation in rayon is not as simple as I initially thought. Because multiple threads fold in parallel, the entire preallocated image will internally be cloned by rayon. The fold operation doesn't return an image, it returns multiple image clones. These then would have to be reduced to obtain a complete image, meaning we would have to merge all the cloned images into one (very complicated)

I will attempt using Arc now, as with a scoped threadpool I would have to require 12+ traits and 12+ associated types to be sendable, which would be ... unfortunate

If your type is not Send then Arc<T> won't be Send. So using Arc here will not make this work.

For example you cannot put a type containing Rc into an Arc to send it to another thread:

use std::sync::Arc;
use std::rc::Rc;
use std::thread;

struct Foo{
    inner: Rc<u8>
}

fn bar(){
    let foo = Foo{inner:Rc::new(42)};
    let arc_foo = Arc::new(foo);
    
    thread::spawn(move||{
        drop(arc_foo);
    });
}
error[E0277]: `Rc<u8>` cannot be shared between threads safely
   --> src/lib.rs:13:5
    |
13  |     thread::spawn(move||{
    |     ^^^^^^^^^^^^^ `Rc<u8>` cannot be shared between threads safely
    |
    = help: within `Foo`, the trait `Sync` is not implemented for `Rc<u8>`
    = note: required because it appears within the type `Foo`
    = note: required because of the requirements on the impl of `Send` for `Arc<Foo>`
    = note: required because it appears within the type `[closure@src/lib.rs:13:19: 15:6]`

I'm guessing your type contains something like a Rc or a pointer, which are common things that tend to make structs !Send.

Types like that are not threadsafe and this is why, as @cuviper points out, the compiler stops you here. If it didn't, you could be debugging mysterious concurrency bugs right now. Even worse, you might not notice any issues and ship this to production. Or, you (or someone else) in the future might change how your struct works and unknowingly introduce a concurrency bug. This is what "Fearless concurrency" is all about.

I would have to require 12+ traits and 12+ associated types to be sendable, which would be ... unfortunate

If it is Rc making your struct !Send, you can just replace any uses of it with Arc to make this work. What is this struct, actually?

No no, that's not the problem. It's a complex architecture. The object in the Arc is sendable, it's a plain struct. But there are also other objects involved, mainly trait objects with type parameters, and adding Send to one trait would have to cascade down to lots of other traits. I can share the whole source code if you like, it's complicated. I can't give you a concise description, as it would be even more complicated than the source code haha

The following is actually compiling, I'm not sure if it works because I did not integrate it into the whole system yet:


    fn decompress_parallel(
        mut self, pedantic: bool,
        mut insert_block: impl /*Send + */FnMut(UncompressedBlock) -> UnitResult
    ) -> UnitResult
        // where Self: Send
    {
   
        let meta_data_arc = Arc::new(self.meta_data().clone());

        let pool = rayon::ThreadPoolBuilder::new().build().expect("thread error");
        // rayon::scope(move |scope|{
            
            let (send, recv) = std::sync::mpsc::channel(); // TODO crossbeam?
            let mut currently_running = 0;

            for chunk in self {
                while currently_running >= 12 {
                    let decompressed = recv.recv().expect("thread error")?;
                    insert_block(decompressed)?;
                    currently_running -= 1;
                }

                let send = send.clone();
                let meta_data_arc = meta_data_arc.clone();

                currently_running += 1;

                pool.spawn(move || {
                    let decompressed = chunk.and_then(|chunk| UncompressedBlock::decompress_chunk(chunk, &meta_data_arc, pedantic));
                    send.send(decompressed).expect("thread error");
                });
            }

            while currently_running > 0 {
                let decompressed = recv.recv().expect("thread error")?;
                insert_block(decompressed)?;
                currently_running -= 1;
            }

            Ok(())
        // })
    }

With the Arc, the rest of the objects do not have do be sendable, as by using the Arc I don't have to use the scope anymore, which was the only reason why any object would have to be sendable. I didn't want to use Arc because it seems like a simple borrow should also work, but it's not that simple.

The types are not simple Rc structs, but they are traits. These are exposed by the library, which means that any library user could want to put anything in there, including non-sendable things. It would be possible to make it send, but it would require me to also make a whole lot of other traits sendable, which is too much work if I can use a simple Arc instead.

No no, that's not the problem.

In a way that is the problem, because currently someone can implement your traits in a way that is not threadsafe, which is why the compiler rejects your program.

These are exposed by the library, which means that any library user could want to put anything in there, including non-sendable things

Requiring Send on your traits would be the correct thing to do here. It's actually not that restrictive; the vast majority of primitives are Send as well as structs built exclusively with them.

Anyways, thanks for your input so far, folks. I tinkered a little and wondered if you think the following would also work:

@alice

Returns an iterator yielding blocks, decompressing in parallel. Calling next first spawns a number of threads and then waits for the next result, which is then returned.

(send, receiver) = channel()
receiver = receiver.into_iter()
blocks = file_contents.blocks_iter()
num_compressing = 0

return iter from_fn {
  while num_compressing < 12
    block = .blocks.next()
    if block is none: break loop
    num_compressing += 1
    spawn (send (decompress block))

  pixels = receiver.next() // awaits next
  if pixels is some: num_compressing -= 1
  pixels
}

Any obvious mistake in there?

If there a special case I didn't account for?

Your suggestion and also my iterator adaption of your code both result in exit code: 0xc0000409, STATUS_STACK_BUFFER_OVERRUN.

This seems to be an error code for when any thread allocates too much memory. I have tried using a custom thread pool with each thread having a stack size of 1GB but the error persisted. I have tried setting the thread count to 1, still errors. Using a vector and synchronous version (push and pop vec instead of send and receive, without any threads) works fine, so the algorithm itself should not be the problem. As the sequential versions do not produce this error, I assume the error somehow occurred due to the multi threading.

Do any of you have an idea why this error occurs? How could I possibly debug this? Googling the error code did not yield any useful results for me, only bug reports for games haha