Hi! I'm the author of the pure Rust exr crate, and I'm revisiting my parallelization code. Basically, I have a sequential iterator, each element should be processed in parallel, and the elements should be collected in the end. Simple, right? ![]()
The Challenge
The reason is this: The file contains blocks of pixels, which are compressed. The blocks should be read from the file sequentially, then decompressed in parallel, and in the end each block is inserted into a pre-allocated image buffer.
Now, it should also be possible to process a very large image (larger than RAM available) block by block, without ever allocating the memory for the whole image at once. This means that there's an additional constraint: No more than X elements should be allocated at once while decompressing each element in parallel. They shall under no circumstances all be be put into a gigantomatic buffer because allocation will fail.
Another special requirement is that the decompression as well as the insertion of blocks into the image may fail and the error must be propagated to the end-user correctly.
What I have tried so far
ParBridge +TryFold
Here's what my first attempt looked like when using rayon (pseudo):
let sequential_compressed_blocks_reader: impl Iterator<Item=Result<CompressedBlock>> = open_file(path); // reading blocks fail due to IO errors
let image: Result<Image> = sequential_compressed_blocks_reader.par_bridge()
// decompress in parallel
.map(|compressed_block_or_err| decompress(compressed_block_or_err?)) // returns Result<DecompressedBlock> as decompression may fail
// if this whole chain returns Err at any point, try_fold_with will abort and return the error (nice)
.try_fold_with(pre_allocated_image, |mut image, decompressed_block_result|{
image.insert_pixels_or_err(decompressed_block_result?)?; // inserting the block into the image may fail
Ok(image) // return the updated image
});
However, there's a slight problem: try_fold_with doesn't actually return the image. It will internally clone the whole preallocated image buffer, and write a few blocks to each clone. try_fold returns these multiple image clones. Then these multiple images buffer clones would have to be reduced into one final image, in an additional complicated call to try_reduce_with. This doesn't seem like a good approach to me, as it involves a ton of allocations, a lot of book keeping to remember which image contains which pixels, and a ton of copy operations just to merge the final images.
TryFold + Send
I have tried to solve this problem folding over a clonable sender instead of folding the image itself:
let sequential_compressed_blocks_reader: impl Iterator<Item=Result<CompressedBlock>> = open_file(path); // reading blocks fail due to IO errors
let (decompressed_block_sender, decompressed_block_receiver) = std::sync::mpsc::sync_channel(12); // no more than 12 pixel blocks in the buffer at once
sequential_compressed_blocks_reader.par_bridge()
// decompress in parallel
.map(|compressed_block_or_err| decompress(compressed_block_or_err?)) // returns Result<DecompressedBlock> as decompression may fail
// if this whole chain returns Err at any point, try_fold_with will abort and return the error (nice)
.try_fold_with(sender, |mut sender, decompressed_block_result|{
// if decompression fails, return err. if ok, then send the decompressed block to the image
decompressed_block_result.map(|decompressed_block| decompressed_block_sender.send(decompressed_block).expect("thread error"))?;
Ok(sender)
})?;
// try inserting the blocks into the image
for decompressed_block in decompressed_block_receiver {
pre_allocated_image.insert_pixels_or_err(block)?;
}
This worked, but as I just realized a few days ago, this actually allocates all the blocks! When calling try_fold_with(...)?, the decompression starts in parallel, but then it waits for any errors, so that it can return Ok() in the end. This means that all elements are buffered internally, until the last element is known to be ok. Then, in one go, the for loop at the end inserts all the buffered blocks into the pre-allocated image. Not good.
TryFold + Send + Async Collector Thread
Today, I had another attempt:
let sequential_compressed_blocks_reader: impl Iterator<Item=Result<CompressedBlock>> = open_file(path); // reading blocks fail due to IO errors
let (decompressed_block_sender, decompressed_block_receiver) = std::sync::mpsc::sync_channel(12); // no more than 12 pixel blocks in the buffer at once
// without blocking, receive decompressed blocks and insert into the image (could use async?)
let collected_image_or_insertion_err = thread::spawn(move ||{
let mut image = pre_allocated_image;
// try inserting the blocks into the image
// this iterator will simply finish when the sender terminates (no matter if error or panic)
for decompressed_block in decompressed_block_receiver {
image.insert_pixels_or_err(block)?;
}
Ok(image)
})
// block the this thread while decompressing in parallel
let ok_or_decompression_error = sequential_compressed_blocks_reader.par_bridge()
// decompress in parallel
.map(|compressed_block_or_err| decompress(compressed_block_or_err?)) // returns Result<DecompressedBlock> as decompression may fail
// if this whole chain returns Err at any point, try_fold_with will abort and return the error (nice)
.try_fold_with(sender, |mut sender, decompressed_block_result|{
// if decompression fails, return err. if ok, then send the decompressed block to the image
// if an error happens when the pixels are inserted into the image, sending will fail, this panics, and the real error is lost, right?
decompressed_block_result.map(|decompressed_block| decompressed_block_sender.send(decompressed_block).expect("cannot send block"))?;
Ok(sender)
});
// this will only report errors that happen while reading from file and decompressing the pixels
// if decompressing errors, this returns immediately, the sender is closed, which ends the collector thread
ok_or_decompression_error?;
// if necessary, wait until all decompressed blocks have been inserted
// will only report errors that happen when pixels are inserted into the pre-allocated image
let collected_image_or_insertion_err = collected_image_or_err.join();
Ok(collected_image_or_insertion_error)
The problem with this is, that the sending may fail and trigger expect("cannot send block") when the receiver terminates - which happens when there is an error while inserting a pixel block into the image. If this happens, the try_fold_with will panic. This means that the original error will be lost and the process is not aborted gracefully.
Question
It seems to me like there should be a simpler way to do all this. Can anyone point me some simple parallelization mechanisms or other approaches to achieve the parallel decompression without excessive allocation? Or maybe there's only a small detail missing in my previous approaches, which can be mad working easily?
Is this a perfect use case for async future Rust?
Thanks for your time ![]()