Merge sorting 100GB worth of data


#1
  1. This is not 100% accurate, numerous issues have been changed to abstract away all but the “core” of the problem.

  2. We have

type Foo = (u64, f64)

for lhs: Foo, rhs: Foo, we say
  lhs < rhs if
    lhs.0 < rhs.0 or (lhs.0 == rhs.0 && lhs.1 < rhs.1)
  1. We have 100GB worth of Foo, stored in 1000 files of 0.1GB each. Content within files are NOT sorted.

  2. Our goal is to produce a 100GB output file of all tuples, sorted.

  3. We have a machien with 128GB RAM. (We don’t, but relative to the data set, it’s accurage, i.e. 1.5 * data set > RAM > data set).

  4. The machine has 24 cores.

  5. Question: what is best way to implement this sort?

===== Some things I have tried:

  1. Single thread, reads all the files into one Vec, sorts it. Bad as it only uses 1/24 cores.

  2. Rayon par_iter + map + reduce. For each file, sorts it. Then use a reduce that (1) concats two vecs and (2) sorts it. This doesn’t work, as we run out of memory (since RAM < 1.5 * dataset).

  3. Raven par_iter = map // for each file, we sort it. Then, afterwards, we do:

let data: Vec<Vec<Foo>> = blah.par_iter().map( read_and_sort_file );
let mut ans : Vec<Vec<Foo>> = Vec::new();

for d in data.into_iter() // this is important, as it frees up memory as ans increases {
  ans.merge_sort(d);
}
  1. So 10 is the best I have so far, but for the merge step, it’s only using a single core.

=====

  1. Anyone dealt with this before? If so, how do I do this? The main issues are:

  2. 1.5 * dataset > RAM, so we don’t have room to “duplicate datasets”

  3. We have more than 1 core, and I’d like to use all of them.

Thanks!


#2

Did you try Rayon’s own sort methods? The stable par_sort allocates a full temporary buffer which you may not want (but see below), or par_sort_unstable works completely in place. (These are stable/unstable in the ordering of equal items, not in quality of code.)

On the extra memory required, you may be better off sorting on some indirection, like a Vec<&T> referencing the items in the full data set, or avoid lifetimes using Vec<usize> and sort_by_key indexing the data. This way you’re not moving so much data around just to sort it.


#3

Sort all the files and write them out again, then merge sort the files. Old skool style.

Do you have an idea of the range of the data? Like, if the u64 is a timestamp in a certain range or similar? Then you can read the files and group elements into range buckets output files, rather than their originals. Then you can read the bucket files in order and sort them internally.

A variant of this is a quick indexing pass over the files to determine the range of each file, that way you only need to read in some of the files as you progress through the range. It won’t help much if the sort key is randomly distributed, but otherwise can save some memory.

If it won’t all fit in memory, it won’t all fit in memory. But a sequential pass through 100Gb of data can be pretty quick, especially if you only need to do it once in preparation.


#4

I was looking for a crate to do this the other day, couldn’t find one. I have it on my list of “ideas of things to do”.


#5

It sounds like you might want to look at how databases do large sorts: they implement some sort of buffering strategy. I’m not a db expert, though…

The approach that comes to my mind is: if each of the N files is F bytes and memory is M bytes, perhaps you can logically break each file into (F * (N+1))/M segments of size M/(N+1) bytes.

Thus, you would have 1001 buffers, one for each input file, plus an extra one to buffer the output. For each file, you load the first chunk. Then, you proceed to merge the contents of the file buffers and write to the output buffer. When the output buffer is full, you spill to disk. When you have finished all the data from one of the buffers, you load the next segment of the file.


#6

How do we get all the elemnts into one large vec in the first place? Do we limit reading to single thread, or is there some way to allocate one alrge Vec, then allow different threads to write ot different subsections of it?


#7

The “u64” is actually a (u32, u32) representing indices of a sparse matrix. The number of distinct “u64” values the data uses is huge, so “binning” type techniques won’t work.


#8

Sure, you can do that using chunks_mut (or split_at_mut for finer control), then use some version of scoped threads to pass the &mut references to each thread. Not sure how that would work with rayon, though.

With some algorithms, you may save time by not fully sorting each individual Vec before merging, because you can use BinaryHeap instead.