Parallelizing SSD reads and subsequent computations

TLDR

Have you any experience or advice relating to parallelizing reading data from SSDs?

Details

  • A typical dataset I have to deal with consists of 100 files occupying around 2G each.

  • Each file typically contains 1 million items.

  • Each item needs to be transformed onto something else. These transformations are fallible, with typical failure rates of the order of 70%. Failed items are filtered from the output set. These transformations are completely independent of eachother, so embarrassingly parallel.

  • All transformed items need to be written out to a single file. The order does not matter.

  • In the era of spinning disks, I always assumed that parallelizing reads would be counterproductive; in the era of SSDs I expect that there may be significant benefits, but I have no practical experience with this whatsoever (other than seeing things like dust run multiple processes concurrently).

Can you offer any words of wisdom on how to approach parallelizing this process?

[Aside: the input and output files are HDF5, and I suspect that it might be worthwhile replacing this with something in the Apache Arrow ecosystem, but that's a topic for another day!]

I found a nice answer on SO about HDF5's parallel and thread-safe built-in capabilities:

AFAIR (backed up by the answer I linked) you need MPI for --enable-parallel reads/writes:

Now, for parallel access, you might want to use "Parallel HDF5" but those features requires using MPI.

Which is probably too much overhead for your use-case.

But you could use the thread-safe version of libhdf5 to read/write your datasets from multiple threads.

Now when it comes to Rust's support for hdf5, there is the hdf5 crate, which provides you with a thread-safe wrapper around libhdf5, regardless whether thread-safety is enabled in libhdf5 or not:

Thread-safety with non-threadsafe libhdf5 builds guaranteed via reentrant mutexes.

Yes, I'd rather not have to add MPI to the infinite list of things I have to think about on this project.

I was hoping for a first-order solution that does not depend on HDF5 specifics, because

  • I have 100 separate input files, so there's plenty of potential for parallelism regardless of what format these files use;
  • I'm considering migrating away from HDF5 (or providing an alternative, at least).

I already use the hdf5 crate you mentioned. At the moment I just collect the whole output dataset in memory before writing it out in one go. Not ideal, I'd prefer to stream it out, but the hardware on which this runs has enough RAM to take this in its stride, so other improvements have much higher priority.

I think that the crucial point is: there's a read:write compression ratio in excess of 10:1, so I expect the biggest gains to be made by parallelizing the reads and the transformations, rather than the writes. The reads involve many different files, so I hope to find significant gains without having to depend on the specific details of the format of these files. By then the bottleneck will probably be elsewhere, in which case I can continue to ignore the details of the format for a little longer.

I don't know about hdf5, so I'll just talk about IO basics.

It depends on your read patterns. A single cpu core can be sufficient to slurp in gigabytes per second (assuming NVMe) if you issue them in sufficiently large blocks and you queue up reads ahead of time, so that IO queue depth (QD) doesn't drop to 0. Run some disk benchmark to see what the upper limit is. So depending on your data processing pipeline you could have 1 IO thread that reads the data in chunks and fans out those chunks to compute threads and then fan-in on a writer thread. If the records aren't fixed-sized one could throw another thread for parsing before fan-out.
You can also start reading the next file while the previous one is still processing. Processing one file at a time and waiting for each to complete before starting the next is one of the things that makes stuff slow.

That said, if such an architecture is inconvenient then yes, SSDs can deal fine with multiple parallel IO streams. In fact they have multiple hardware IO queues to that can accept work concurrently. But those primarily help with small random accesses, much less with large sequential ones.

The worst thing one can do for any IO workload is blocking on small, random reads on a single thread, then doing some compute, then do the next IO. In that scenario the thread will neither be neither IO- nor CPU-bound because the workload will be alternating between those bottlenecks and leave the other resource underutilized in the meantime. Parallelizing such a "dumb" approach can mask some of its inefficiencies by exploiting the massive IOPS that today's SSDs support. But it might take more memory than necessary because you need to keep more stuff in flight at once or may expose inefficiencies somewhere else in your software stack.

Large writes should be very cheap as long as they're not fsync()'ed because they just go to the OS cache which then aggregates them before writing them out to disk.

I'd start with

  • benchmarking disk capabilities or reading its spec sheet
  • gather some high-level stats about the current code (CPU utilization, user vs. kernel time, IO bandwidth, peak memory use)
  • gather IO statistics (syscalls, read/write sizes)

and then decide where the lowest-hanging fruit is.

6 Likes

Do you have any recommendations about what tools to use to gather relevant statistics?

gnu time or parsing /proc/self/stat can get you max-rss. perf stat, perf trace and strace for syscall statistics and tracing (with the right filters). strace can significantly slow down the application (so maybe don't use it on production systems) but can print individual call arguments to get a better idea what's happening. vmstat, htop, iotop, iostat can show CPU and IO utilization. fio can be used to benchmark the disk.

For general profiling see the rust performance book. Some of the tools allow gathering both on-CPU and off-CPU (waiting for IO) time.

And of course you can add statistics to your application to aggregate which processing steps take how much time.

2 Likes

I ran a little experiment with two threads:

  1. Reads HDF5 files, one by one, and sends the data extracted from each file through a channel to the other; one send per file.
  2. Receives the data from the channel and processes them.

This gives me a thread-load pattern like this

where the read-HDF5 thread is 100% busy, while the other one finishes processing each batch relatively quickly and twiddles its thumbs waiting for the next batch to arrive. No surprises there. A suspicion that this might be the case, is what led me to think that reading concurrently might help, in the first place.

If I try to read the HDF5 files on multiple threads, then reading HDF5 only ever makes progress on one thread at a time: the recorded stacks show that when one thread stops doing HDF5 stuff, some other thread unblocks and does HDF5 stuff. It doesn't matter whether I use rayon or manually-spawned threads, this seems to be an invariant.

As I have no experience with concurrent reads from persistent storage, I have no idea what is preventing these distinct HDF5 files from being read concurrently (the OS, hardware drivers, HDF5, Rust, other) or whether it is a fundamental problem or something that can be worked around.

I think the easiest way will be to mmap your file, represent the resulting memory as a slice of chunks, and process the slice using rayon or something similar. Assuming your processing is fast enough, OS prefetcher should keep SSD saturated since every thread will "read" data mostly sequentially. Note that depending on relative speeds of processing and used SSD, you may need to use more workers than number of cores, try to experiment with the number.

A more sophisticated solution would be to use io-uring to issue asynchronous read requests, which will be executed while you do processing of already read chunks.

1 Like

I think it'd be appropriate to ping @pkolaczk I think he's got some tricks about parallelizing reads on SSDs ?

The question is what you're blocked on when reading - are you blocking in the read syscall (which is something you can check by straceing your threads on Linux), in which case you're doing concurrent read requests and it's OS or hardware stopping concurrent reads, or are you blocked on a userspace mutex (futex syscall on Linux).

If you're blocked in a userspace mutex, then your HDF5 library is preventing you from reading multiple files concurrently. If you're blocking in the read syscall on every thread, then you're submitting concurrent reads, but not getting results any faster.

2 Likes

It's hard to tell without further details being provided, but it would fit the pattern you'd see if there's a lock inside the HDF5 library you're using or disk access gets fully serialized, e.g. by a file lock.

This gives me a thread-load pattern like this [...] where the read-HDF5 thread is 100% busy

You're being unclear here. Is that thread just reading the bytes from disk or reading and also deserializing the HDF5 format? If it is the latter then you're not separating IO from compute.

3 Likes

Maybe I'm missing something here, but it seems that this would mean that I have to take on the burden of understanding and decoding the HDF5 format. This is a something I'd rather delegate to libraries written by experts in the technology.

Yes, this post of his significantly contributed to my deciding to invest some time in barking up this tree.

strace spits out a few of these

futex(0x7f17b0801910, FUTEX_WAIT_BITSET|FUTEX_CLOCK_REALTIME, 894390, NULL, FUTEX_BITSET_MATCH_ANY) = 0

at the relevant time. So it seems that HDF5 is doing the blocking.

I also did a higher-level experiment: replace the call to hdf5 read utilities with std::fs::read_file (which required replacing the original processing of the real data, with something that burns a bit of CPU on the incoming bytes): this showed multiple reader threads making progress concurrently. Which is consistent with HDF5 doing the blocking.

Indeed, I should have been more explicit. I'm using the HDF5 library on this thread to read the data in, which does deserialize the data. So it definitely mixes IO and computation.

Looking at GitHub - aldanor/hdf5-rust: HDF5 for Rust (which I assume is the HDF5 library you're using), it has a single global lock for calling into the underlying C library it binds, to guarantee thread-safety even with a thread-unsafe C library underneath you.

You'll need to find another way to get parallelism.

2 Likes

If there's no shared state between the files then a simple approach would be spawning one process per file.

That'd still be blocked by HDF5, either the Rust crate or libhdf5. From the HDF5 documentation:

The HDF5 library can be built in thread-safe mode. The thread-safe version of the HDF5 library effectively serializes the HDF5 library calls. It is thread-safe but not thread-efficient.

From the SO answer I linked above:

Users are often surprised to learn that (1) concurrent access to different datasets in a single HDF5 file and (2) concurrent access to different HDF5 files both require a thread-safe version of the HDF5 library. Although each thread in these examples is accessing different data, the HDF5 library modifies global data structures that are independent of a particular HDF5 dataset or HDF5 file. HDF5 relies on a semaphore around the library API calls in the thread-safe version of the library to protect the data structure from corruption by simultaneous manipulation from different threads. Examples of HDF5 library global data structures that must be protected are the freespace manager and open file lists.

Yes.

Thanks for checking this.

None.

Separate process, as opposed to threads ... yes ... how would I do that from within my main process, and communicate the data efficiently from the multiple reader processes to the single (perhaps multi-threaded) compute processes?

From the same documentation

Multiple processes should not be confused with multiple threads. Below is a summary regarding multiple "things" accessing a file:

    Multiple processes DO NOT require the thread-safe library (SWMR is required if there is a writer, but not thread-safety).

    Multiple threads DO require the thread-safe library.

The reason for this is that separate processes do not share memory and cannot affect each other. Threads DO share memory and CAN interfere with each other.

which seems to me to be implying (as seems logical to me) that it should be possible to read concurrently from multiple processes as opposed to threads.

At the point where you go from multithreading to multiprocessing, I personally would start considering MPI. Though I have no experience with MPI Rust bindings. Actually I have no idea whether there is any Rust framework for easy multiprocessing (maybe in the likes of Julia's Distributed module), I would be very interested if someone here knows whether there is any crate out there?

Or may thinking about converting the HDF5 files to a less strict file format would actually be the easiest solution? In your original question you hinted that you'd consider to move from HDF5 to another format (you mentioned Arrow).

Might this be a use case for capnproto? I hear that there is a rust implementation, and I read from their website that:

Inter-process communication: Multiple processes running on the same machine can share a Cap’n Proto message via shared memory. No need to pipe data through the kernel. Calling another process can be just as fast and easy as calling another thread.

However, I never used it and couldn't confirm, from what I read on GitHub, whether the rust implementation also fulfills these claims.

2 Likes

What you want to achieve is saturate the full throughput of disk reads. The largest thing that changed since HDD is that individual reads interfere much less in a data dependent way. You'll much more have to worry about how many reads (of the block size) your program can issue and process instead of having to worry about locality on disk of those blocks to interfere.

To saturate the number of reads in flight, the important thing is recognizing that waiting on single reads is extremely inefficient. Latency of each single random read operation on the SSD may be somehwere in the order of 2^-12 seconds or so, in the largely sequential read setting you're aluding to. That's a mere 2^24 B/s if you serialize with the completion of each in sequence which means the required number of parallel such queues is ~64 parallel queues to get close to mere 6GBit/s SATA limits. Numbers may look worse for NVMe, you might actually find the need for a thousand parallel queues to sature those in particular if you're also writing results. When you run below that limit then your implementation will depend a lot on intelligent caches predicting your usage and issuing reads before you request them. This may work, but it's not very reliable and you'll get a lot of spikes / mulitmodal distributions in your performance numbers.

The best trick would be to make sure everything in your stack is performing non-sequential IO. Issues reads that are large enough to be properly parallelized internally in these libraries, if they support it on the platform. IMPORTANT: Do not use mmap. And if the library is in Rust, see if they support an async interface for small requests which would allow your implementation to compute on reads as the complete, with each thread / process working on multiple queues concurrently.

If Arrow improves goodput then maybe. But it sounds like your data is both large and uniform. If disk IO is indeed the limit at the moment then the compression variants integrated into Arrow may help much more than switching the layout of matrices. You'll need to benchmark / test to be sure though.

"largely sequential read setting" is an imprecise way to put this. It could be misinterpreted as sequential reads being slow. This isn't true. I can get 2 GB/s of my NVMe drive (faster than SATA) with a single thread just issuing large read() calls, no readaheads.

What usually makes things slower than what the hardware can provide is some combination of

A) synchronous reads
B) no readahead
C) small read syscalls (lack of userspace buffering)
D) work being single-threaded
E) delay between issuing reads, i.e. interleaving compute and IO (only applicable if points A and B are also true)

Attacking any of those can yield improvements, some more so than others.

IMPORTANT: Do not use mmap

Eh, this thread seems to be about batch jobs, not a databases. Fiddling a bit with madvise could probably make mmap work in that scenario.

For example at $work we once had an issue with a 3rd-party raytracing library that would either read data from file or from memory buffers. The vendor's file loading routine pretty much did all the points listed above wrong. So one of the solutions was to mmap the files, do some madvise calls and then pass it to the library instead. Same data, almost the same function call but turned minutes into seconds.
Would I have implemented it that way if it had been our library? No. But given the available APIs this was a fairly easy win.