Processing files with iterators

I want to write a simple toy program that filters a bunch of JSON files based on some fields.

It is able to move all files with the same value for a field in the same directory, while ignoring files on a similar criteria. These fields may be nested. So all files that have the field foo set to bar will be moved to a directory called bar.

I initially written it with a simple for that iterates over a Vec<PathBuf> which I populate with fs::read_dir.

The core of this processing is this function:

fn process_file(
    it_file: &std::path::Path,
    by: Criteria,
    flt: &[FilterType],
    dst: &std::path::Path,
) -> anyhow::Result<()> {
    let fname = it_file
        .file_name()
        .ok_or(format!("Unable to extract a file name from {:?}", it_file))
        .map_err(anyhow::Error::msg)?;
    let file = File::open(&it_file)?;
    let reader = BufReader::new(file);
    let json: serde_json::Value = serde_json::from_reader(reader)?;

    // In case we don't want to skip this file, copy it to the proper directory.
    if !should_skip(&json, &flt) {
        let criteria = get_criteria_value(by, &json).unwrap_or("__unknown__");
        let dst_dir = dst.join(criteria);

        fs::create_dir_all(&dst_dir)?;
        let dst_file = dst_dir.join(fname);
        fs::copy(it_file, dst_file)?;
    }

    Ok(())
}

Criteria and FilterType are just simple enums that I map to certain fields. I do this instead of using directly field names because the fields I care about may be nested. should_skip and get_criteria_value simply match on those. I won't copy/paste their implementation here in order to keep the post from being extra long. should_skip simply returns true/false, while get_criteria_value returns a Option<&str> with the field value. It is not an error if the field is not present, but I care about reporting back all other possible errors.

I managed to re-write this using scan:

    let r: Vec<Result<()>> = files
        .iter()
        .scan(0, |_, it_file| {
            Some(process_file(it_file, by, &flt, dst))
        })
        .collect();

This works, but I'm not sure if scan is the best fit here. The docs say:

scan() takes two arguments: an initial value which seeds the internal state, and a closure with two arguments, the first being a mutable reference to the internal state and the second an iterator element. The closure can assign to the internal state to share state between iterations.

I'm not really using this feature. I just want the Vec<Result> so I can look at all the possible errors that appeared.

I guess I could use try_for_each and stop on the first error, is there a way of knowing for which file it happened? Or do I have to somehow pass that back manually?

This brings me to my next problem: what if I want to do the work in parallel using something like rayon. Fortunately, it was pretty easy to do that:

    let r = files
        .into_par_iter()
        .try_for_each(|it_file| process_file(&it_file, by, &flt, dst));

There is no equivalent for scan, so now I'm simply getting a Result<()> with no way of knowing the file for which I got an error.

On a similar note, is there a way of returning something other than Result<()>? If I change process_file to return Result<u32>, for example, and I try:

    let r = files
        .iter()
        .try_for_each(|it_file| process_file(&it_file, by, &flt, dst));

I get the following error:

error[E0271]: type mismatch resolving `<Result<u32, anyhow::Error> as Try>::Ok == ()`
   --> src\main.rs:263:10
    |
263 |         .try_for_each(|it_file| process_file(&it_file, by, &flt, dst));
    |          ^^^^^^^^^^^^ expected `u32`, found `()`

I don't really understand what this error refers to.

So, to recap my questions:

  1. is scan a good fit in my initial example?
  2. is there a way to know for which array element try_for_each stopped with an error?
  3. is there a way to return meaningful information in the Ok case?

You can do into_par_iter().map(returns_result).collect::<Vec<Result<_,_>>() and at the end go though the Vec of results checking which files failed and which didn't. It won't stop on the first error.

Alternatively, use rayon::task for fire-and-forget spawning of work, and then use crossbeam-channel to get results back.

2 Likes

Thanks, it works. Are there any performance concerns between the two approaches? Assuming that no errors are encountered, is there any overhead from trying to collect all results? I imagine it has to be, but is it noticeable?

Also thanks for the tip. I'm mostly at a phase in which I'm trying to rewrite my code using iterators just to get used to the idea, but I'll keep this in mind.

It does add an allocation, but in a happy case that should be just one allocation. If you're not planning to have more files than reasonable to store in memory, it's likely not a problem. Use a profiler to verify performance.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.