[Multithreading] What are some effective ways to accomplish this task in Rust?

I am writing a program which needs to take a huge input file anywhere between 15MB to 5GB and then scan through, similar to grep. I wanted to split the input file up into 5 sections and feed each part to a separate thread. E.G. Thread 1 covers lines 0-160,000, thread 2 covers lines 160,001-320,000 etc...

So I have shared data - the buffer holding the data which is being sliced up and given to different threads for READING ONLY, the quotient of how many lines there are in the file divided by the number of threads, and the terms to search for. I combined these into a struct and created an Arc from it. The idea was to feed this Arc into a wrapper function. This wrapper function will clone the arc 5 separate times in the case of 5 threads, and then feed each clone into the thread::spawn method closure.

The following example program actually works fine, despite me trying to cause a problem:

use std::time::Duration;
use std::thread;
use std::sync::Arc;
struct Shared
{
    name: String,
    age: u32
}

fn print_person(person: &Shared)
{
    println!("Name: {}\nAge: {}", person.name, person.age)
}
fn main()
{
    let person1 = Shared {name: "James".to_string(), age: 32};
    let main_arc = Arc::new(person1);
    do_threading(main_arc);

}

fn do_threading(main_arc: Arc<Shared>)
{
    let p1_arc = main_arc.clone();
    let p2_arc = main_arc.clone();
    let myString = p1_arc.name.clone();

    let t1 = thread::spawn(move || {
        println!("Thread 1 starting...");
        println!("Test1: {}", myString);
        println!("Test1: {}", p1_arc.age);
        print_person(&p1_arc);
        thread::sleep(Duration::from_secs(5));
        println!("Thread 1 exiting...");
    });

    let t2 = thread::spawn(move || {
        println!("Thread 2 starting...");
        print_person(&p2_arc);
        println!("Thread 2 exiting...");
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

This was my attempt at troubleshooting the problem and I haven't been able to actually reproduce the problem in that little example.

Now the actual problem code looks like this and it currently produces the error:

error[E0597]: `new_tda1` does not live long enough
  --> src/main.rs:70:33
   |
70 |         let strings_as_slice1 = new_tda1.vector_of_strings.as_slice();
   |                                 ^^^^^^^^ borrowed value does not live long enough
...
90 |     });
   |     -- borrowed value needs to live until here
   |     |
   |     `new_tda1` dropped here while still borrowed

error: aborting due to previous error

For more information about this error, try `rustc --explain E0597`.
error: Could not compile `file_io`.

To learn more, run the command again with --verbose



use std::fs::File;
use std::io::prelude::*;
use std::thread;
use std::sync::Arc;
extern crate regex;
use regex::Regex;
const NUMBER_OF_THREADS: usize = 5;
struct ThreadData {
    vector_of_strings: Vec<String>,
    terms: Vec<& 'static str>,
    quotient: usize
}
fn main()
{
    
   let mut file = File::open("info2.txt").expect("Can't open file!");
   let mut terms: Vec<&str> = Vec::new();
   construct_regex(&mut terms);
   let mut contents = String::new();

   file.read_to_string(&mut contents) //TODO: Uhhh... This should prob be a memory map at some point
     .expect("Oops! Can not read the file...");

    let vector_of_strings: Vec<String> = contents.split("\n").map(|s| s.to_string()).collect();
    let total_lines: usize = vector_of_strings.len();
 
    println!("Total # of lines in file: {}", total_lines);
    let quotient: usize = total_lines / NUMBER_OF_THREADS;
    
    let td = ThreadData {
        vector_of_strings: vector_of_strings,
        terms: terms, 
        quotient
    };
    let td_arc = Arc::new(td);
    threaded_search(td_arc);

 println!("Done, exiting...");


}

fn threaded_search<'a>(td_arc: Arc<ThreadData>)
{

    let new_tda1 = td_arc.clone();
    let new_tda2 = td_arc.clone();
    let new_tda3 = td_arc.clone();
    let new_tda4 = td_arc.clone();
    let new_tda5 = td_arc.clone();
    let strings_as_slice1 = new_tda1.vector_of_strings.as_slice();
    let strings_as_slice2 = new_tda2.vector_of_strings.as_slice();
    let strings_as_slice3 = new_tda3.vector_of_strings.as_slice();
    let strings_as_slice4 = new_tda4.vector_of_strings.as_slice();
    let strings_as_slice5 = new_tda5.vector_of_strings.as_slice();
    let handle1 = thread::spawn(move || {perform_search(&strings_as_slice1[0..new_tda1.quotient], &new_tda1.terms);});
    let handle2 = thread::spawn(move || {perform_search(&strings_as_slice2[new_tda2.quotient..new_tda2.quotient*2], &new_tda2.terms);});
    let handle3 = thread::spawn(move || {perform_search(&strings_as_slice3[new_tda2.quotient*2..new_tda2.quotient*3], &new_tda3.terms);});
    let handle4 = thread::spawn(move || {perform_search(&strings_as_slice4[new_tda3.quotient*3..new_tda3.quotient*4], &new_tda4.terms);});
    let handle5 = thread::spawn(move || {perform_search(&strings_as_slice5[new_tda4.quotient*4..new_tda4.quotient*5], &new_tda5.terms);});
    handle1.join().unwrap(); 
    handle2.join().unwrap();
    handle3.join().unwrap();
    handle4.join().unwrap();
    handle5.join().unwrap();
}

Please note that most of this code at this point (its been many many hours of troubleshooting/learning since I'm new to Rust and I need to do concurrency right away) is "troubleshooting code" (not how it was originally written but modified in an attempt to stop errors) and for the real application, I'll do a re-write of this without the redundant variables and such. I'm having a dificult time simplifying this question any more because when I change one thing in one area of my code, another area of the code is then flagged as being an error. This has gone on for a long time and thus I've gotten stuck but I am aware that this problem definitely has to do with ownership. Thanks for your guidance.

1 Like

You want to do the following instead:

let handle1 = thread::spawn(move || {
        let strings_as_slice1 = new_tda1.vector_of_strings.as_slice();
        perform_search(&strings_as_slice1[0..new_tda1.quotient], &new_tda1.terms);
    });

Note the new_tda1 is moved into the closure, and the slice is created there. The problem is you were creating a slice tied to the Arc in threaded_search, but that causes the closure to borrow that slice and thus not be 'static, which is a requirement for any closure given to thread::spawn. I can elaborate on this bit if you'd like, but I'll stop here for now. The same issue is present for your other slices.

You should look into a few libraries (crates) that may assist you:

  1. rayon - this is a work-stealing threadpool API, but allows you to express the algorithm via Iterator-like combinators - it does the work splitting, thread assignment, etc for you.
  2. crossbeam - this can give you "scoped" threads - these are threads that can, in fact, borrow something from the current thread's environment. Put another way, the closure that will run on a different thread can access variables in the threaded_search fn, for example.
  3. Lastly, if you're actually doing grep-like text search, ripgrep allows you to use it as a library. You may be able to just use that outright, without any extra threads.
2 Likes

If you're effectively processing things line-by-line, Rayon even has par_lines() that you could use to iterate on the entire buffer. It might look something like:

extern crate rayon;
use rayon::prelude::*;

fn main() {
    let data: String = read_file();
    let terms = ["foo", "bar"];

    let matches: Vec<&str> = data.par_lines()
        .filter(|line| compare(line, &terms))
        .collect();

    for line in matches {
        println!("{}", line);
    }
}
7 Likes

I just wanted to say thanks to both of you. @vitalyd as far as the error I was getting, you're 100% right... That was literally all I had to fix... That little bit took me about a day of trying and failing, so that tip is much appreciated! Additionally, thanks for the recommendation of ripgrep, I have ripgrep but haven't attempted to use it "as a library" so thank you.

@cuviper I also highly appreciate your advice; I figured there was probably a much easier way to go about the task but I'm also happy that I am starting "from scratch" because I've always wanted to get proficient at concurrent programming and I think choosing Rust as the language to do it in is probably a great choice.

2 Likes

Sure, I wouldn't fault anyone for wanting to learn this stuff. I just like to point out cases that seem well suited for rayon. :slight_smile:

1 Like

This thread turns out to have inspired me a bit. This looked a lot like a problem my desync crate ought to be really good at. It works with streams rather than in-memory so it should work well with large data sets where holding everything in memory would be prohibitive. The problem was distributing the work amongst several workers. I'd implemented a simple pub-sub library in flo_stream which could do the job if there was a way to publish to the first idle subscriber, so I've added a new SinglePublisher there to do exactly that.

This then shook out a bunch of bugs that occurred when streams close. Those are now fixed :slight_smile:

I've turned my test code into an example, which is now here: parallel_counter.rs. There's a certain amount of caveat emptor here as these are my crates and not at all mainstream, but it's definitely kind of neat that the dummy code to count the number of 0s in a stream is about the same length as the scheduling code.

Here's what's happening in the example:

You can split up a futures Stream into chunks using the chunks() call. For multi-gigabyte tasks that's probably better than trying to read everything into memory at once. I've made a stream of random numbers in the example.

Desync provides a pipe() call for processing streams in the background. count_zeros() in the example is a function that reads chunks of data from a stream and counts the number of zeros in each.

We could just send the chunks straight to count_zeros() but that would only create a single worker thread. So instead, I create a SinglePublisher and send the chunks there (using send_all()). Then I create five count_zero() workers and provide them with subscriber streams. Now it'll process the data on 5 threads.

Streams have a concept called 'backpressure', which pauses the flow of data while processing is taking place. With SinglePublisher it also means that if data is available it will be sent to the first available thread so if one thread is running slow there's no chance of starvation. It also keeps memory usage down when reading from giant files as it'll stop reading if the threads are all busy.

Last thing is another Desync where the 5 result streams are piped in order to provide some feedback on the current status and also display a running total.

1 Like