Multiplex batches of read file lines to rayon tasks

Hello,
I am making a CLI tool to produce a histogram out of a JSON log file (one JSON object per line) -- it counts the distinct occurrences of a particular JSON object field.

I had no problems following the recommended BufReader.read_line idiom with a single mutable String buffer and make it work reliably.

But now I'd like to parallelise it. I gather there are a lot of ways but either due to a case of a XY problem, or because of how performant my equivalent Elixir code is, I'd like to have the parallel implementation roughly do the following:

  1. Serially (single thread) read the file.
  2. Make and collect batches of, say, 500 lines.
  3. Send each batch to a separate thread.
  4. Each separate thread does JSON parsing of each line in its batch and updates the shared-state histogram (a HashMap<String, u64>).

In the IRC channel I got told that "a quick loop of try_sends and a blocking send for each line (make a thread pool, send work to them from a single reader thread over a bounded channel)" would likely be the quickest solution without involving rayon.

My issue is: I am still rather new and I still don't know much of Rust's idioms. Also, rayon seems to not need the concept of batches; it claims it can automatically adapt to load so maybe the batches producing part can be entirely scrapped?

Can somebody help with some sample code snippets to help with this? I am not looking to have my homework written for me; I just need a good starting point.

(F.ex. I have no clue how could I translate a BufReader.read_line loop to a 500-line chunk producer. Or even to a simple iterator producer.)

The following snippet would open the logs file and parse/process every line parallel. I think rayon is the goto library for such a problem, it is designed to work with data. But please do not try to comine mpsc or Mutexes with rayon, it may cause dead locks.

use rayon::prelude::*;
use serde::Deserialize;
use std::io::BufRead;

/// This describes the following JSON object: {"f1": 34}
#[derive(Deserialize, Debug)]
struct LogLine {
    f1: u32,
}

fn main() {
    let fd = std::fs::File::open("logs").unwrap();
    let x = std::io::BufReader::new(fd);

    x
        .lines()        // split to lines serially
        .filter_map(|line: Result<String, _>| line.ok())
        .par_bridge()   // parallelize
        .filter_map(|line: String| serde_json::from_str(&line).ok()) // filter out bad lines
        .for_each(|v: LogLine| {
           // do some processing (in parallel)
           println!("X={}", v.f1);
        });
}
1 Like

(Fully edited the previous version of this since it was quite the rookie question.)

Isn't the .lines() making a new String for each line? Is there a way to make the BufReader.read_line idiom (with a single mutable String buffer) yield an Iterator? I am still very new and I am not sure if I could properly implement Iterator.

You can use a single string buffer like this:

use rayon::prelude::*;
use serde::Deserialize;
use std::io::Read;

/// This describes the following JSON object: {"f1": 34}
#[derive(Deserialize, Debug)]
struct LogLine {
    f1: u32,
}

fn main() {
    let mut fd = std::fs::File::open("logs").unwrap();
    let len = fd.metadata().unwrap().len() as usize;
    let mut file = String::with_capacity(len);
    fd.read_to_string(&mut file).unwrap(); // This reads the entire file into memory.
    drop(fd);

    file.lines()        // split to lines serially
        .par_bridge()   // parallelize
        .filter_map(|line| serde_json::from_str(line).ok())
        .for_each(|v: LogLine| {
            // do some processing (in parallel)
            println!("X={}", v.f1);
        });
}

This uses str::lines instead of BufRead::lines, which returns references into the same string instead of a new allocation per line.

That would read the entire file in memory. Since I expect those files to be 1GB or more... I suppose it's still not a big deal but for now will go with the one String allocation per line.

I mean, you could also change it to read max 100 MB into memory, and then process all the complete lines in that, and then do the next 100 MB of lines.

That's just the thing: still learning so fine-tuning those algorithms still doesn't come naturally -- I don't know most of the Rust traits and my brain still doesn't bend that way. :slight_smile:

You could do it like this playground.

2 Likes

Can I ask ... what does this do:

fd.by_ref().take((CHUNK_SIZE - s.len()) as u64).read_to_end(&mut s).unwrap();

.. specifically (CHUNK_SIZE - s.len())

Isn't CHUNK_SIZE and s.len() the same?

Also, sorry @dimitarvp for "hijacking" this post, but I hope you'll find this helpful because I roughly want to understand the same:

Say I read the file into a Vec of Vecs like this: file_vec<rec_vec>...

// smf is an array like this - [u8; number]
let smf_bytes = smf.len();
let mut pos= 0;
let mut file_vec: Vec<Vec<u8>> = Vec::new();
while pos < smf_bytes {
    let rec_len = u16::from_be_bytes(smf[pos..pos+2].try_into().unwrap());
    let vec_size = rec_len as usize;
    let mut rec_vec = vec![0u8; vec_size];
    // println!("pos:{} rec_len:{} vec_size:{}", pos, rec_len, vec_size);
    rec_vec.copy_from_slice(&smf[pos..pos+rec_len as usize].to_vec());
    file_vec.push(rec_vec);
    pos += rec_len as usize;
}
&file_vec.par_iter()
    .for_each(|rec| do_it(rec));

... if my file_vec goes above, say, 500 MB, send it to rayon to process.
Not exactly 500 MB because I don't want to handle broken lines.. so file_vec can grow up to around 500MB, such that it only contains complete, unbroken rec_vec lines.

EDIT:
It would be amazing to make use of new libraries that will help with this problem -
https://docs.rs/blocking/0.2.0/blocking/ ... if it's relevant here.

No, s.len() returns the length, not the capacity, so in the first iteration the length is 0, and in later iterations it is how many bytes were in the incomplete line that was transferred to the next chunk.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.