[SOLVED] Spawning threads for file monitoring


#1

I’m trying to tail a number of files each in its own thread. At the moment, just trying to print those new lines out as they are received, but the intention is to later pass the collected lines back into one big bucket for parsing via mpsc::channel? I’m spawning a thread for each file, but it will only monitor the first file - it doesn’t launch the second thread. I’m guessing I’ve got a fundamental misunderstanding of how the threads return back to signal the next can be spawned off - help much appreciated! =)

extern crate logwatcher;
use logwatcher::LogWatcher;

use std::thread;

fn file_monitor(filename: &str) {
    println!("Now in thread to monitor {}.", filename);
    let mut log_watcher = LogWatcher::register(filename.to_string()).unwrap();
    log_watcher.watch( |line: String| {
        println!("DATA: {}", line);
    });
}

fn main() {

    let file1 = "./test_file_1.data";
    let file2 = "./test_file_2.data";

    let mut files: Vec<&str> = Vec::new();

    files.push(file1);
    files.push(file2);

    let handle = thread::spawn(|| {
        for file in files {
            file_monitor(file);
        }
    });

    handle.join().expect("Error joining thread handles!");

}

#2

So, the misunderstanding here concerns the semantics of thread::spawn. This function creates one new thread and makes it run the functor that is passed as an argument, at the end of which the thread terminates. So as currently written, your code spawns one thread, which begins to loop over the files, runs file_monitor on the first file, and blocks somewhere in log_watcher.watch().

The fix should be as simple as running thread::spawn inside of the loop on files, to create one thread per file, and then using a vector of handles instead of a single handle. Something like this:

    let handles = Vec::with_capacity(files.len());
    for file in files {
        handles.push(
            thread::spawn(|| { file_monitor(file); })
        );
    }

    for handle in handles {
        handle.join().expect("Error joining thread handles!");
    }

#3

Thank you, @HadrienG! The compiler flagged handles had to be mut, and to use a move on the closure, but it’s now working correctly with lines being tailed out from both files. Superb! :star_struck:

let mut handles = Vec::with_capacity(files.len());
    for file in files {
        handles.push(
            thread::spawn( move || { file_monitor(file); })
        );
    }

    for handle in handles {
        handle.join().expect("Error joining thread handles!");
    }

#4

Note that threads can be somewhat expensive. You may have to add a queue if there are many files, to limit how many of them are processed in parallel.


#5

I’m going to be parsing metrics from Hadoop and Accumulo, so around 7 file sinks (which I’ll use logrotate to manage the size of each one). Is there a general rule of thumb for limits around file processing using the approach in the code above?


#6

Some systems limit the number of open file descriptors per process to 1024. So if you do input/output processing on two files, you can run at most about 500 threads in parallel. The limit is lower if more files are needed for processing, of course.

It is possible to lift the file descriptor limit using ulimit or setrlimit, so there are really no hard rules. And at a certain point (usually well before 500 threads), network or storage system bandwidth is saturated or the CPU is fully used, and using more threads at this point only increases the scheduling overhead.


#7

@fweimer thank you for the very useful info.