How to fix too many open files issue

Hello,
I have some problem with a function that read many files in multithread.
I get the error os code 24 Too many open files

Here is my function :

 for file_path in ffuf_files {
        let tx = tx.clone();
        let filters = filters.clone();
        child.push(thread::spawn(move || {
            let input = File::open(file_path.as_path()).unwrap();
            let buffered = BufReader::new(input);
            let mut reader = csv::Reader::from_reader(buffered);
            'here: for result in reader.records() {
                match result {
                    Ok(record) => {
                        for (column, value) in &filters {
                            if record[*column] != *value {
                                continue 'here;
                            }
                        }
                        tx.send(record).unwrap();
                    }
                    Err(error) => {
                        eprintln!("{}: {:?}", error, file_path.as_path());
                    }
                }
            }
        }));
    }
    // This makes sure there are no senders left when all threads are done.
    drop(tx);
    let mut one_result = false;
    for message in &rx {
        one_result = true;
        if let Err(e) = wtr.write_record(&message) {
            eprintln!("Error for message : {}\nError : {}", message.as_slice(), e);
        }
    }
    // We can join after receiving.
    for c in child {
        c.join().unwrap();
    }
    one_result

It works well with a little number of files but not for a lot of files. I guess we need to make a pool to not open everything directly but I have no idea what is the best method to do it .
Should I use the threadpool crate ?

Is that good ? :
I'm not sure if I need to join() the threads and if yes , how . c.join() doesnt work


fn process_ffuf_csv(ffuf_files: Vec<PathBuf>, filters: Vec<(usize, String)>) -> bool {
    let mut wtr = csv::Writer::from_path("out.csv").expect("error writing file");
    wtr.write_record(&[
        "FUZZ",
        "url",
        "redirectlocation",
        "position",
        "status_code",
        "content_length",
        "content_words",
        "content_lines",
        "resultfile",
    ])
    .unwrap();
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build()
        .unwrap();

    let (tx, rx) = channel();
    for file_path in ffuf_files {
        let tx = tx.clone();
        let filters = filters.clone();
        pool.spawn(move || {
            let input = File::open(file_path.as_path()).unwrap();
            let buffered = BufReader::new(input);
            let mut reader = csv::Reader::from_reader(buffered);
            'here: for result in reader.records() {
                match result {
                    Ok(record) => {
                        for (column, value) in &filters {
                            if record[*column] != *value {
                                continue 'here;
                            }
                        }
                        tx.send(record).unwrap();
                    }
                    Err(error) => {
                        eprintln!("{}: {:?}", error, file_path.as_path());
                    }
                }
            }
        });
    }
    // This makes sure there are no senders left when all threads are done.
    drop(tx);
    let mut one_result = false;
    for message in &rx {
        one_result = true;
        if let Err(e) = wtr.write_record(&message) {
            eprintln!("Error for message : {}\nError : {}", message.as_slice(), e);
        }
    }
     one_result
}

That looks reasonable to me. You don't need to join the threads, because you loop over rx until the channel is closed, which won't happen until all the threads finish and drop their senders.

Another way to wait for all threads to complete is rayon::scope.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.