Filter multiple csv and get output in another csv with multithreading

Hello,
I'm trying to code a tool that will read every ".csv" file inside a directory reccursively and make an output file with all line of these files that have "200" as "status_code"
The normal version works and It is like that :

 use scan_dir::ScanDir;
use std::path::PathBuf;
use std::fs::File;
use std::io::{BufReader, Read, BufRead};
use std::{io, thread};

fn main() {
    let  ffuf_files : Vec<PathBuf>= ScanDir::files().walk("/home/neolex/reconbug/hunt/samsclub.com/", |iter| {
        iter.filter(|&(_, ref name)| name.ends_with(".csv"))
            .map(|(ref entry, _)| entry.path())
            .collect()
    }).unwrap();
    let mut wtr = csv::Writer::from_path("out.csv").expect("error writing file");
    wtr.write_record(&["FUZZ","url","redirectlocation","position","status_code","content_length","content_words","content_lines","resultfile"]);

    for file_path in ffuf_files {
        let input = File::open(file_path.as_path()).unwrap();
        let buffered = BufReader::new(input);
        let mut reader = csv::Reader::from_reader(buffered);
        for result in reader.records()
        {
            let record = result.expect("a CSV record");
            if &record[4] == "200" {
                wtr.write_record(&record);
            }
        }
    }

}

But it is still a little slow if I have a lot of big files, so I want to make it multithreaded but I'm a bit lost.
Especially on how to write on the output file without messing everything, I guess I need locks.

Could you help me please ?

What I recommend is to use an std::sync::mpsc for transferring the data to write. This lets you have a bunch of reader threads and one writer thread.

Thank you for your help.
I made something but I get an error on the tx.send() the error is

sending on a closed channel

here is my code :

use scan_dir::ScanDir;
use std::path::PathBuf;
use std::fs::File;
use std::io::{BufReader, Read, BufRead};
use std::{io, thread};
use std::sync::mpsc::channel;


fn main() {
    let  ffuf_files : Vec<PathBuf>= ScanDir::files().walk("/home/neolex/reconbug/hunt/samsclub.com/", |iter| {
        iter.filter(|&(_, ref name)| name.ends_with(".csv"))
            .map(|(ref entry, _)| entry.path())
            .collect()
    }).unwrap();
    let mut wtr = csv::Writer::from_path("out.csv").expect("error writing file");
    wtr.write_record(&["FUZZ","url","redirectlocation","position","status_code","content_length","content_words","content_lines","resultfile"]);
    let (tx, rx) = channel();
    let filesLen = ffuf_files.len();
    for file_path in ffuf_files {
        let tx = tx.clone();
        thread::spawn(move || {
            let input = File::open(file_path.as_path()).unwrap();
            let buffered = BufReader::new(input);
            let mut reader = csv::Reader::from_reader(buffered);
            for result in reader.records()
            {
                let record = result.expect("a CSV record");
                if record[4] == String::from("200") {
                    //wtr.write_record(&record);
                    match tx.send(record) {
                        Ok(e) => e,
                        Err(e) => eprintln!("{}",e)
                    }
                }
            }
        });
    }
    for _ in 0..filesLen {
        wtr.write_record(&rx.recv().unwrap());
    }

}

do you have an idea from where come the error please ?

I made it , is it good or did I made mistake ?

use scan_dir::ScanDir;
use std::path::PathBuf;
use std::fs::File;
use std::io::{BufReader, Read, BufRead};
use std::{io, thread};
use std::sync::mpsc::channel;
use std::thread::JoinHandle;


fn main() {
    let  ffuf_files : Vec<PathBuf>= ScanDir::files().walk("/home/neolex/reconbug/hunt/samsclub.com/", |iter| {
        iter.filter(|&(_, ref name)| name.ends_with(".csv"))
            .map(|(ref entry, _)| entry.path())
            .collect()
    }).unwrap();
    let mut wtr = csv::Writer::from_path("out.csv").expect("error writing file");
    wtr.write_record(&["FUZZ","url","redirectlocation","position","status_code","content_length","content_words","content_lines","resultfile"]);
    let (tx, rx) = channel();
    let filesLen = ffuf_files.len();
    let mut child= vec![];
    for file_path in ffuf_files {
        let tx = tx.clone();
        child.push(thread::spawn(move || {
            let input = File::open(file_path.as_path()).unwrap();
            let buffered = BufReader::new(input);
            let mut reader = csv::Reader::from_reader(buffered);
            for result in reader.records()
            {
                let record = result.expect("a CSV record");
                if record[4] == String::from("200") {
                    //wtr.write_record(&record);
                    match tx.send(record) {
                        Ok(e) => e,
                        Err(e) => eprintln!("{}",e)
                    }
                }
            }
        }));
    }
    for c in child {
        c.join();
    }
    for _ in 0..filesLen {
        wtr.write_record(&rx.recv().unwrap());
    }

}

Does a way to do rx.recv() until it is empty exist please ?

Yes, when all senders are gone, recv returns None. It looks something like this:

// This makes sure there are no senders left when all threads are done.
drop(tx);

for message in rx {
    wtr.write_record(&rx.recv().unwrap());
}
// We can join after receiving.
for c in child {
    c.join();
}

If sending fails, you should probably just exit because that can only happen if rx goes out of scope before the call to send.

Thank you it works well like that!

Uhh, of course you should use message in the loop, not call .recv() again.

for message in rx {
    wtr.write_record(&message);
}

I've read somewhere that I should avoid std::sync::mpsc and use crossbeam channels instead. I can't find where I read that, so I'm wondering if I misunderstood something.

The crossbeam channels are slightly faster and have more features, but there isn't anything wrong with std::sync::mpsc per se.