Hello,
I'm trying to code a tool that will read every ".csv" file inside a directory reccursively and make an output file with all line of these files that have "200" as "status_code"
The normal version works and It is like that :
use scan_dir::ScanDir;
use std::path::PathBuf;
use std::fs::File;
use std::io::{BufReader, Read, BufRead};
use std::{io, thread};
fn main() {
let ffuf_files : Vec<PathBuf>= ScanDir::files().walk("/home/neolex/reconbug/hunt/samsclub.com/", |iter| {
iter.filter(|&(_, ref name)| name.ends_with(".csv"))
.map(|(ref entry, _)| entry.path())
.collect()
}).unwrap();
let mut wtr = csv::Writer::from_path("out.csv").expect("error writing file");
wtr.write_record(&["FUZZ","url","redirectlocation","position","status_code","content_length","content_words","content_lines","resultfile"]);
for file_path in ffuf_files {
let input = File::open(file_path.as_path()).unwrap();
let buffered = BufReader::new(input);
let mut reader = csv::Reader::from_reader(buffered);
for result in reader.records()
{
let record = result.expect("a CSV record");
if &record[4] == "200" {
wtr.write_record(&record);
}
}
}
}
But it is still a little slow if I have a lot of big files, so I want to make it multithreaded but I'm a bit lost.
Especially on how to write on the output file without messing everything, I guess I need locks.
Thank you for your help.
I made something but I get an error on the tx.send() the error is
sending on a closed channel
here is my code :
use scan_dir::ScanDir;
use std::path::PathBuf;
use std::fs::File;
use std::io::{BufReader, Read, BufRead};
use std::{io, thread};
use std::sync::mpsc::channel;
fn main() {
let ffuf_files : Vec<PathBuf>= ScanDir::files().walk("/home/neolex/reconbug/hunt/samsclub.com/", |iter| {
iter.filter(|&(_, ref name)| name.ends_with(".csv"))
.map(|(ref entry, _)| entry.path())
.collect()
}).unwrap();
let mut wtr = csv::Writer::from_path("out.csv").expect("error writing file");
wtr.write_record(&["FUZZ","url","redirectlocation","position","status_code","content_length","content_words","content_lines","resultfile"]);
let (tx, rx) = channel();
let filesLen = ffuf_files.len();
for file_path in ffuf_files {
let tx = tx.clone();
thread::spawn(move || {
let input = File::open(file_path.as_path()).unwrap();
let buffered = BufReader::new(input);
let mut reader = csv::Reader::from_reader(buffered);
for result in reader.records()
{
let record = result.expect("a CSV record");
if record[4] == String::from("200") {
//wtr.write_record(&record);
match tx.send(record) {
Ok(e) => e,
Err(e) => eprintln!("{}",e)
}
}
}
});
}
for _ in 0..filesLen {
wtr.write_record(&rx.recv().unwrap());
}
}
do you have an idea from where come the error please ?
use scan_dir::ScanDir;
use std::path::PathBuf;
use std::fs::File;
use std::io::{BufReader, Read, BufRead};
use std::{io, thread};
use std::sync::mpsc::channel;
use std::thread::JoinHandle;
fn main() {
let ffuf_files : Vec<PathBuf>= ScanDir::files().walk("/home/neolex/reconbug/hunt/samsclub.com/", |iter| {
iter.filter(|&(_, ref name)| name.ends_with(".csv"))
.map(|(ref entry, _)| entry.path())
.collect()
}).unwrap();
let mut wtr = csv::Writer::from_path("out.csv").expect("error writing file");
wtr.write_record(&["FUZZ","url","redirectlocation","position","status_code","content_length","content_words","content_lines","resultfile"]);
let (tx, rx) = channel();
let filesLen = ffuf_files.len();
let mut child= vec![];
for file_path in ffuf_files {
let tx = tx.clone();
child.push(thread::spawn(move || {
let input = File::open(file_path.as_path()).unwrap();
let buffered = BufReader::new(input);
let mut reader = csv::Reader::from_reader(buffered);
for result in reader.records()
{
let record = result.expect("a CSV record");
if record[4] == String::from("200") {
//wtr.write_record(&record);
match tx.send(record) {
Ok(e) => e,
Err(e) => eprintln!("{}",e)
}
}
}
}));
}
for c in child {
c.join();
}
for _ in 0..filesLen {
wtr.write_record(&rx.recv().unwrap());
}
}
Yes, when all senders are gone, recv returns None. It looks something like this:
// This makes sure there are no senders left when all threads are done.
drop(tx);
for message in rx {
wtr.write_record(&rx.recv().unwrap());
}
// We can join after receiving.
for c in child {
c.join();
}
If sending fails, you should probably just exit because that can only happen if rx goes out of scope before the call to send.
I've read somewhere that I should avoid std::sync::mpsc and use crossbeam channels instead. I can't find where I read that, so I'm wondering if I misunderstood something.