Reading a file 4x faster using 4 threads (Works - threaded is faster!)

Hello

I want to make a small program to search a word in a file, but I'd like to implement threads to do it faster.
This is how I'd do it:

  1. With std::io::Seek I can change the file-pointer. The file-pointer is where Rust starts reading a file. Everytime a character is read/scanned it increases by one.

  2. I'd make a function like this search_for_word(word : String, start : u64, end: u64) where word is the word we're searching for, start is the position where the function will set the file-pointer and should start reading. When the file-pointer reaches end the file-pointer should stop reading.

  3. I'll make four threads to call this function. If the file would contain 100 characters it'd go like this:

    • Thread A: search_for_word("word", 0, 25)
    • Thread B: search_for_word("word", 25, 50)
    • Thread C: search_for_word("word", 50, 75)
    • Thread D: search_for_word("word", 75, 100)
  4. The file to search would be a global/static/... variable known by all threads.

  5. This way every thread only has to read 25 characters, and because four threads read their 25 characters at the same time it will be four times faster than one thread having to read all the 100 characters.

Would this work? Or do I have to do it in another way? Thanks!

Your bottleneck is the speed of disk-IO, not the speed of the CPU, so I doubt this will help at all.

3 Likes

I would highly recommend reading into ripgrep for performance hints.

1 Like

For the reverse complement benchmark from the Computer Language Benchmarks Game, I found the fastest strategy was to read the entire file into memory on one thread, split it up into chunks, and process these chunks in parallel on separate threads. In general, if you have enough memory to hold the entire file at once, reading it into a single pre-allocated buffer up front (e.g. using fs::read) is often much faster than doing multiple smaller reads.

You also need to be careful around the boundaries where you split the file. For example, what if the boundary falls in the middle of your search word? You need to account for the case where the word starts in one chunk and continues into the next.

One way to do this would be to read (word.len() - 1) characters past the start of the next chunk. For your example, this would look something like this:

  • Thread A: search_for_word("word", 0, 28)
  • Thread B: search_for_word("word", 25, 53)
  • Thread C: search_for_word("word", 50, 78)
  • Thread D: search_for_word("word", 75, 100)

This will fix the case where "word" is found starting at offset 24 and ending at offset 27 within the file.

18 Likes

This is what I have done so far by the way, for those stumbling on this thread in the future and wondering.

/*
 *  The purpose of this program is to find a specific word
 *  in a text file very fast. To achieve this we use threads.
 *
 */

use std::path::Path;
use std::vec::Vec;

/**
 * 
 * Function to search for a specific word in a file.
 *
 * # Arguments
 * * `word` - The word to search in file. (The needle).
 * * `file` - The file to search in, is type std::vec::Vec. (The haystack).
 * * `start` - Position to start searching from.
 * * `end` - When this position is reached the search is discontinued.
 *
 * # Return
 * Returns an integer representing the position where `word` is found.
 * Returns -1 if `word` is not found.
 */
fn search_for_word(word: String, file: Vec<u8> , start: u64, end: u64) -> i64 {
    // Checking if all parameters are valid
    if start >= end {
        panic!("Parameter `end` should have a bigger value than `start`! Aborting...");
    }
    if word.len() == 0 || word.contains(' ') {
        panic!("Parameter `word` had an invalid value! Aborting...");
    }

    // Looping over all characters in file starting from `start` to `end`
    let mut word_buffer : String = "".to_string();
    for i in start..end {
        let character: char = file[i as usize] as char;

        // If character is a space (AKA a word has passed) or end of
        // file is reached: check if word matches with our needle.
        if character == ' ' || i+1 == file.len() as u64 {
            if word_buffer == word {
                // Matches!
                println!("{} is found at position {}!", word, 1+i-word.len() as u64);
                return i as i64; // ??
            }

            word_buffer = "".to_string();
            continue;
        }

        // Appending character to word until space (AKA end of word)
        word_buffer = format!("{}{}", word_buffer, character);
    };

    -1
}

fn main() {

    // Path to file, and file read into memory (std::vec::Vec)
    let path : &Path = Path::new("src/file.txt");
    let file = std::fs::read(path).expect("File not found...");

    // Word to search
    let word : String = "def".to_string();

    // Amount of chars in file
    let length : u64 = file.len() as u64;

    // Dividing in four chuncks to divide over the four threads
    let mut one_fourth : u64 = length / 4;

    // In special scenario where our file is so small that
    // `one_fourth` results in being smaller than the length of
    // the word, make chuncks smaller.
    if one_fourth <= word.len() as u64 {
        one_fourth = word.len() as u64 -1;
    }

    // Vector holding threads
    let mut threads = Vec::new();

    // Making four separate threads calling the `search_for_word` function
    for i in 0..4 {
        let file = file.clone();
        let word = word.clone();

        // Determing start and end of this chunck
        let start : u64 = i * one_fourth;
        let mut end : u64 = (i+1) * one_fourth + (word.len() -1) as u64;
        if end > file.len() as u64 || i == 3 {
            end = file.len() as u64;
        }

        let thread = std::thread::spawn(move || {
            search_for_word(word.clone(), file, start, end);
        });
        threads.push(thread);
    }  

    // Waiting for all threads to finish
    for thread in threads {
        thread.join().expect("Unknown thread error occured.");
    }

}

file.txt

abc def 123

It works fine. This is what I still have to do:

  • Make all other threads stop when word is found in one thread.
  • Catch the return value from the thread that found it.
  • Making a timer to see how fast it was found, so I can compare it to non-threaded code.
  • Having too much as u64 IMHO for some reason, trying to get rid of those somehow (not that important).

I'll try to figure out those four things on my own, but suggestions/hints are always welcome though.

It's known that the APFS(apple file system) has global kernel lock which is acquired for every fs touching operations. In this case parallelism on fs cannot makes ops faster, it only pays for the thread spawning and synchronization cost.

4 Likes

Damn that sucks. But I'll finish this program anyways just for fun, and I think it has some nice scenarios to learn how to use threads in Rust.

So, I want to return the position where the word is found (pretending that the word can only exists one time in the file). When a word is not found the function search_for_word returns -1.
To do this I collect all the results from the threads into a vector. The only value in the vector not being -1 is the position of where the word is found.

The issue is on L104 though, I can't update my vector in a move clause.

/*
 *  The purpose of this program is to find a specific word
 *  in a text file very fast. To achieve this we use threads.
 *
 */

use std::path::Path;
use std::vec::Vec;

/**
 * 
 * Function to search for a specific word in a file.
 *
 * # Arguments
 * * `word` - The word to search in file. (The needle).
 * * `file` - The file to search in, is type std::vec::Vec. (The haystack).
 * * `start` - Position to start searching from.
 * * `end` - When this position is reached the search is discontinued.
 *
 * # Return
 * Returns an integer representing the position where `word` is found.
 * Returns -1 if `word` is not found.
 */
fn search_for_word(word: String, file: Vec<u8> , start: u64, end: u64) -> i64 {
    // Checking if all parameters are valid
    if start >= end {
        panic!("Parameter `end` should have a bigger value than `start`! Aborting...");
    }
    if word.len() == 0 || word.contains(' ') {
        panic!("Parameter `word` had an invalid value! Aborting...");
    }

    // Looping over all characters in file starting from `start` to `end`
    let mut word_buffer : String = "".to_string();
    for i in start..end {
        let character: char = file[i as usize] as char;

        // If character is a space (AKA a word has passed) or end of
        // file is reached: check if word matches with our needle.
        if character == ' ' || i+1 == file.len() as u64 {
            if word_buffer == word {
                // Matches!
                //println!("{} is found at position {}!", word, 1+i-word.len() as u64);

                return i as i64; // ??
            }

            word_buffer = "".to_string();
            continue;
        }

        // Appending character to word until space (AKA end of word)
        word_buffer = format!("{}{}", word_buffer, character);
    };

    -1
}

fn main() {

    // Path to file, and file read into memory (std::vec::Vec)
    let path : &Path = Path::new("src/file.txt");
    let file = std::fs::read(path).expect("File not found...");

    // Word to search
    let word : String = "def".to_string();

    // Amount of chars in file
    let length : u64 = file.len() as u64;

    // Dividing in four chuncks to divide over the four threads
    let mut one_fourth : u64 = length / 4;

    // In special scenario where our file is so small that
    // `one_fourth` results in being smaller than the length of
    // the word, make chuncks smaller.
    if one_fourth <= word.len() as u64 {
        one_fourth = word.len() as u64 -1;
    }

    // Vector holding threads
    let mut threads = Vec::new();

    // Vector holding all returns from `search_for_word`
    let mut search_for_word_return_values : Vec<i64> = Vec::new();
    
    // Variable holding position where word is found
    let mut word_found_at : i64 = -1;

    // Making four separate threads calling the `search_for_word` function
    for i in 0..4 {
        let file = file.clone();
        let word = word.clone();

        // Determing start and end of this chunck
        let start : u64 = i * one_fourth;
        let mut end : u64 = (i+1) * one_fourth + (word.len() -1) as u64;
        if end > file.len() as u64 || i == 3 {
            end = file.len() as u64;
        }

        let thread = std::thread::spawn(move || {
            let x = search_for_word(word.clone(), file, start, end);
            search_for_word_return_values.push(x);
        });
        threads.push(thread);
    }

    // Highest value (AKA the only value not -1) will be the position
    // where the word is found. Only the thread who found the word will not return -1
    for val in search_for_word_return_values {
        if val != -1 {
            word_found_at = val;
        }
    }

    // Waiting for all threads to finish
    for thread in threads {
        thread.join().expect("Unknown thread error occured.");
    }
                
    println!("{} is found at position {}!", word, word_found_at);

}

Error

error[E0382]: use of moved value: `search_for_word_return_values`
   --> src/main.rs:102:41
    |
85  |     let mut search_for_word_return_values : Vec<i64> = Vec::new();
    |         --------------------------------- move occurs because `search_for_word_return_values` has type `std::vec::Vec<i64>`, which does not implement the `Copy` trait
...
102 |         let thread = std::thread::spawn(move || {
    |                                         ^^^^^^^ value moved into closure here, in previous iteration of loop
103 |             let x = search_for_word(word.clone(), file, start, end);
104 |             search_for_word_return_values.push(x);
    |             ----------------------------- use occurs due to use in closure

error: aborting due to previous error

Try something like

// Vector holding all returns from `search_for_word`
    let search_for_word_return_values : Arc<Mutex<Vec<i64>>> = Arc::new(Mutex::new(Vec::new()));
    
    // Variable holding position where word is found
    let mut word_found_at : i64 = -1;

    // Making four separate threads calling the `search_for_word` function
    for i in 0..4 {
        let file = file.clone();
        let word = word.clone();

        // Determing start and end of this chunck
        let start : u64 = i * one_fourth;
        let mut end : u64 = (i+1) * one_fourth + (word.len() -1) as u64;
        if end > file.len() as u64 || i == 3 {
            end = file.len() as u64;
        }

        let mut search_for_word_return_values = search_for_word_return_values.clone();

        let thread = std::thread::spawn(move || {
            let x = search_for_word(word.clone(), file, start, end);
            search_for_word_return_values.lock().unwrap().push(x);
        });
        threads.push(thread);
    }

    // Waiting for all threads to finish
    for thread in threads {
        thread.join().expect("Unknown thread error occured.");
    }

    // Highest value (AKA the only value not -1) will be the position
    // where the word is found. Only the thread who found the word will not return -1
    for val in search_for_word_return_values.lock().unwrap() {
        if val != -1 {
            word_found_at = val;
        }
    }

This wraps search_for_word_return_values in Arc and Mutex to make it sharable between multiple threads and threadsafe to access. Also I moved the join above the for val in search_for_word_return_values, otherwise the threads are likely to not have done anything yet when trying to get their results.

2 Likes

Also in this case you could just return the value from the thread:

        let thread = std::thread::spawn(move || {
            search_for_word(word.clone(), file, start, end)
        });
        threads.push(thread);
    }

    // Highest value (AKA the only value not -1) will be the position
    // where the word is found. Only the thread who found the word will not return -1
    for thread in threads {
        let val = thread.join().expect("Thread panicked.");
        if val != -1 {
            word_found_at = val;
        }
    }
1 Like

Also if you are trying to speed up things, you should start by making one thread efficient before thinking about parallelization. There are two things that could be improved immediately:

  • word_buffer = "".to_string(); always forces a new allocation. Use word_buffer.clear() instead to reuse the allocation. (The creation of an empty string is also better off using the simpler String::new(), not with "".to_string().
  • word_buffer = format!("{}{}", word_buffer, character); is a Schlemiel's algorithm; worse yet, it also forces a new allocation upon processing each individual character. Useword_buffer.push(character); instead.
8 Likes

Pushing works, but looping over the vector yields an error.

    // Highest value (AKA the only value not -1) will be the position
    // where the word is found. Only the thread who found the word will not return -1
    for val in search_for_word_return_values.lock().unwrap() {
        if val != -1 {
            word_found_at = val;
        }
    }
error[E0277]: `std::sync::MutexGuard<'_, std::vec::Vec<i64>>` is not an iterator
   --> src/main.rs:114:16
    |
114 |     for val in search_for_word_return_values.lock().unwrap() {
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `std::sync::MutexGuard<'_, std::vec::Vec<i64>>` is not an iterator
    |
    = help: the trait `std::iter::Iterator` is not implemented for `std::sync::MutexGuard<'_, std::vec::Vec<i64>>`
    = note: required by `std::iter::IntoIterator::into_iter`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.
error: could not compile `fast_read`.

To learn more, run the command again with --verbose.

Try for val in search_for_word_return_values.lock().unwrap().iter() {

2 Likes

Works perfectly!

    // Highest value (AKA the only value not -1) will be the position
    // where the word is found. Only the thread who found the word will not return -1
    for val in search_for_word_return_values.lock().unwrap().iter() {
        if *val != -1 {
            word_found_at = *val;
        }
    }

2 Likes

Thanks for your suggestions! Implemented them. Had to use word_buffer.push(character) though instead of append.

I finished my program and it works! My threaded version works A LOT faster!!!!
So cool.

Here is the version with no threads (same code, but all thread-related code deleted).

/*
 *  The purpose of this program is to find a specific word
 *  and see how fast it goes.
 *
 */

use std::path::Path;
use std::vec::Vec;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::{Duration, Instant};

/**
 * 
 * Function to search for a specific word in a file.
 *
 * # Arguments
 * * `word` - The word to search in file. (The needle).
 * * `file` - The file to search in, is type std::vec::Vec. (The haystack).
 * * `start` - Position to start searching from.
 * * `end` - When this position is reached the search is discontinued.
 *
 * # Return
 * Returns an integer representing the position where `word` is found.
 * Returns -1 if `word` is not found.
 */
fn search_for_word(word: String, file: Vec<u8> , start: u64, end: u64) -> i64 {
    // Checking if all parameters are valid
    if start >= end {
        panic!("Parameter `end` should have a bigger value than `start`! Aborting...");
    }
    if word.len() == 0 || word.contains(' ') {
        panic!("Parameter `word` had an invalid value! Aborting...");
    }

    // Looping over all characters in file starting from `start` to `end`
    let mut word_buffer : String = String::new();
    for i in start..end {
        let character: char = file[i as usize] as char;

        // If character is a space (AKA a word has passed) or end of
        // file is reached: check if word matches with our needle.
        if character == ' ' || i+1 == file.len() as u64 {
            if word_buffer == word {
                // Matches!
                //println!("{} is found at position {}!", word, 1+i-word.len() as u64);

                return i as i64; // ??
            }

            word_buffer.clear();
            continue;
        }

        // Appending character to word until space (AKA end of word)
        word_buffer.push(character);
    };

    -1
}

fn main() {
    // Timing how long this program takes to execute
    let now = Instant::now();

    // Path to file, and file read into memory (std::vec::Vec)
    let path : &Path = Path::new("src/file.txt");
    let file = std::fs::read(path).expect("File not found...");

    // Word to search
    let word : String = "myneedle".to_string();

    // Amount of chars in file
    let length : u64 = file.len() as u64;
    
    let x = search_for_word(word.clone(), file, 0, length);
    

    println!("{} is found at position {}!", word, x);

    println!("Program finished in {} ms", now.elapsed().as_millis());
}

And this is the super fast threaded version!!

/*
 *  The purpose of this program is to find a specific word
 *  in a text file very fast. To achieve this we use threads.
 *
 */

use std::path::Path;
use std::vec::Vec;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::{Duration, Instant};

/**
 * 
 * Function to search for a specific word in a file.
 *
 * # Arguments
 * * `word` - The word to search in file. (The needle).
 * * `file` - The file to search in, is type std::vec::Vec. (The haystack).
 * * `start` - Position to start searching from.
 * * `end` - When this position is reached the search is discontinued.
 *
 * # Return
 * Returns an integer representing the position where `word` is found.
 * Returns -1 if `word` is not found.
 */
fn search_for_word(word: String, file: Vec<u8> , start: u64, end: u64) -> i64 {
    // Checking if all parameters are valid
    if start >= end {
        panic!("Parameter `end` should have a bigger value than `start`! Aborting...");
    }
    if word.len() == 0 || word.contains(' ') {
        panic!("Parameter `word` had an invalid value! Aborting...");
    }

    // Looping over all characters in file starting from `start` to `end`
    let mut word_buffer : String = String::new();
    for i in start..end {
        let character: char = file[i as usize] as char;

        // If character is a space (AKA a word has passed) or end of
        // file is reached: check if word matches with our needle.
        if character == ' ' || i+1 == file.len() as u64 {
            if word_buffer == word {
                // Matches!
                //println!("{} is found at position {}!", word, 1+i-word.len() as u64);

                return i as i64; // ??
            }

            word_buffer.clear();
            continue;
        }

        // Appending character to word until space (AKA end of word)
        word_buffer.push(character);
    };

    -1
}

fn main() {
    // Timing how long this program takes to execute
    let now = Instant::now();

    // Path to file, and file read into memory (std::vec::Vec)
    let path : &Path = Path::new("src/file.txt");
    let file = std::fs::read(path).expect("File not found...");

    // Word to search
    let word : String = "myneedle".to_string();

    // Amount of chars in file
    let length : u64 = file.len() as u64;

    // Dividing in four chuncks to divide over the four threads
    let mut one_fourth : u64 = length / 4;

    // In special scenario where our file is so small that
    // `one_fourth` results in being smaller than the length of
    // the word, make chuncks smaller.
    if one_fourth <= word.len() as u64 {
        one_fourth = word.len() as u64 -1;
    }

    // Vector holding threads
    let mut threads = Vec::new();

    // Vector holding all returns from `search_for_word`
    let mut search_for_word_return_values : Arc<Mutex<Vec<i64>>> = Arc::new(Mutex::new(Vec::new()));
    
    // Variable holding position where word is found
    let mut word_found_at : i64 = -1;

    // Making four separate threads calling the `search_for_word` function
    for i in 0..4 {
        let file = file.clone();
        let word = word.clone();

        // Determing start and end of this chunck
        let start : u64 = i * one_fourth;
        let mut end : u64 = (i+1) * one_fourth + (word.len() -1) as u64;
        if end > file.len() as u64 || i == 3 {
            end = file.len() as u64;
        }

        let mut search_for_word_return_values = search_for_word_return_values.clone();
        let thread = std::thread::spawn(move || {
            let x = search_for_word(word.clone(), file, start, end);
            search_for_word_return_values.lock().unwrap().push(x);
        });
        threads.push(thread);
    }


    // Waiting for all threads to finish
    for thread in threads {
        let x = thread.join().expect("Unknown thread error occured.");
    }
    
    // Highest value (AKA the only value not -1) will be the position
    // where the word is found. Only the thread who found the word will not return -1
    for val in search_for_word_return_values.lock().unwrap().iter() {
        if *val != -1 {
            word_found_at = 1 + *val - word.len() as i64;
        }
    }
                
    println!("{} is found at position {}!", word, word_found_at);

    println!("Program finished in {} ms", now.elapsed().as_millis());
}

I've used a famous textfile called alice29.txt (152kb) and placed the word "myneedle" somewhere near the end.

My threaded version can find this word in less than 7ms while the non-threaded version sometimes even take 20ms!!!!!!! That literally is close to 4 times faster!!

That is 4x faster because the word you're looking for is not in a random position, but near the end of the last block that is searched and you are scanning for blocks at the same time, instead of scanning a single block. So you reach the end faster. If you have a thread for each CPU core, this basically optimises for the worst-case scenario.

What if the word you're looking for is near the start of the file? From what I see, you now have to wait for 3 other threads to finish as well, instead of finishing as soon as there's a match.
How much slower is your threaded version for this case?

It would be interesting to see what are the execution times for a large variation of cases, having the search-term randomly inserted in different locations in the file and averaging all execution times across a large number of runs.

Another question, as I'm new to Rust myself I don't know what exactly file.clone() does. I expect it to make a full copy of the underlying char array.
That is not very efficient. Can the original memory, which is read-only, not be shared across threads using references?
Same applies to the word variable, but as it's much shorter it's not such an issue there.

2 Likes

You're right. The drop-in solution here is to use Arc (ref counted smart pointer). If one switches to using an implementation of scoped threads, for example using rayon, it's also possible to just use references. But in this place, with std::thread::spawn, it's not. (The thread is not bound to finish in any particular scope, so references cannot be shared with it.)

2 Likes

I'm not sure if anyone's mentioned this before, but you can skip a lot of the copying that happens when a file is read (i.e. copying the data from kernel space to user space) by memory-mapping the file.

This is where the kernel maps the entire file into your program's address space so it appears as a normal &[u8]. The kernel then wires up your computer's virtual memory manager so that the relevant part of the file is read into memory as you try to access it. Depending on your use case, this may be faster than reading from std::fs::File.

Some links:

6 Likes

Another optimisation would be not to copy characters into the word-buffer and then compare the words, but to compare each character with character from word as you scan past it, updating your pointer into word as long as characters match and when they don't match, just scan for the next word-break character.
You need a slightly more complicated state machine in your code, and need to be careful with partial matches both when word is shorter and when it's longer than the word at your current position in the file.

Which brings me to the realisation that your program doesn't seem to care about linebreaks or punctuation, so for instance someword\nmyneedle would not find myneedle.
You'll need a better word-boundary match than just char ' '.

And finally, you divide the file into chunks but don't care what happens at the start of each chunk. If the file contains the word notmyneedle and the chunk-boudary is exactly between the t and the m, one of the threads will find a hit on myneedle even though it's not an exact word match.
You basically need to start scanning each chunk except your first chunk at boundary-1 and then scan for the first space in that chunk.

Complications, complications! And public code review can start to feel brutal perhaps... :wink:

1 Like

@tnleeuw You're right. When I insert myneedle somewhere in the beginning the non-threaded version is faster. How can I terminate the other threads when a thread found the word?

Will be implementing your other suggestions too but this seemed the most important.

1 Like