'Simple' wordcount example and iterator ergonomics


#1

Hi all,
I’m playing around with Rust and enjoying some bits and having trouble with others bits. I show up on IRC pretty frequently asking for help and people have been kind enough to help me get through the various compilation errors.

One of the toy examples I’m trying to create is a wordcount example. This reads text, splits it based on whitespace, and then counts the number of times each token appears in the text. I’m having trouble getting it to work using iterators and am wondering if I’m just not understanding this stuff well enough, or if there’s really an issue of using iterators on str/String and tuples thereof more painful than it needs to be.

Some background: the simple wordcount basically runs the following function to fill a BTreeMap with a string mapped to a u32 count:

type CountMap = BTreeMap<String, u32>;

fn process_file(path: &Path) -> CountMap {
    let file = File::open(&path).unwrap();
    let rdr = BufReader::new(file);
    let mut mapped : BTreeMap<String, u32> = BTreeMap::new();
    for line in rdr.lines() {
        match line {
            Ok(line_) => {
                for token in line_.split(char::is_whitespace) {
                    // Filter out multiple spaces delimiting to empty strings.
                    if token.len() > 0 {
                        *mapped.entry(token.to_owned()).or_insert(0) += 1;
                    }
                }
            }
            Err(e) => { 
                println!("Error reading file: {}", e);
                panic!("Error!");
            }
        }
    }
    mapped
}
```

(Aside: One thing to node here is that splitting on whitespace results in multiple spaces yielding 0 length tokens. I can see this being ok if you have a csv file and have `1,2,,4` buf with whitespace I don't think anyone would expect it to parse `1 2<2 spaces>4` as 4 tokens.)

The simple case was easy to implement. I was really happy with the performance, and so on. But I wanted to try reading the input on multiple threads and aggregate the values. So I wrote most of a version that runs over multiple threads to read in a Hadoop style split file (a directory with multiple file parts calls `part-00000`, `part-00001`, etc). This version runs a scoped thread for each thread part and then uses a [merging iterator adapter](http://bluss.github.io/rust-itertools/doc/itertools/trait.Itertools.html#method.merge_by) to print the results without having to copy all the values into a large tree (in the hopes of making a single pass more efficiently).

However, I'm having a lot of difficulty iterating over the String keys in the BTreeMap key value tuples and my example doesn't build:

```
#![feature(path_ext)]
#![feature(core)]
extern crate itertools;
extern crate core;

use std::borrow::ToOwned;
use std::collections::BTreeMap;
use std::fs::{File, read_dir, PathExt, DirEntry};
use std::io::{BufRead, BufReader};
use std::iter::Iterator;
use std::path::Path;
use std::thread;
use itertools::Itertools;

type CountMap = BTreeMap<String, u32>;

fn process_file(path: &Path) -> CountMap {
    println!("mapping: {}", path.to_str().unwrap_or(""));
    let file = File::open(&path).unwrap();
    let rdr = BufReader::new(file);
    let mut mapped : BTreeMap<String, u32> = BTreeMap::new();
    for line in rdr.lines() {
        match line {
            Ok(line_) => {
                for token in line_.split(char::is_whitespace) {
                    // Filter out multiple spaces delimiting to empty strings.
                    if token.len() > 0 {
                        *mapped.entry(token.to_owned()).or_insert(0) += 1;
                    }
                }
            }
            Err(e) => { 
                println!("Error reading file: {}", e);
                panic!("Error!");
            }
        }
    }
    mapped
}

fn print_counts<I, K, V>(counts : I) where
    I: Iterator<Item=(K, V)>,
    K: ::std::fmt::Display,
    V: ::std::fmt::Display,
{
    for (key, value) in counts {
        println!("{}\t{}", key, value);
    }
}

fn main() {
    let args : Vec<String> = ::std::env::args().collect();
    if args.len() < 2 { 
        println!("Usage: wordcount <infile|indir> <outfile>");
    }
    let filename = &args[1];
    let path = ::std::path::Path::new(filename);
    if path.is_file() {
        println!("Just a file");
        let mapped = process_file(path);
        print_counts(mapped.iter());
    } else if path.is_dir() {
        println!("Partitions call for threads");
        let mut guards = vec![];
        for entry in read_dir(path).unwrap() {
            let p : DirEntry = entry.unwrap();
            let guard = thread::scoped(move || {
                process_file(p.path().as_path())
            });
            guards.push(guard);
        }
        let counts = guards.into_iter().map(|x|{x.join()}).collect::<Vec<CountMap>>();
        println!("shuffle/reduce.");
        let mut counts_iter = counts.into_iter();
        let zero : Box<Iterator<Item=(String, u32)>> = Box::new(counts_iter.next().unwrap().into_iter());
        let merged_counts : Box<Iterator<Item=(String, u32)>> = counts_iter.fold(zero, |acc, item| {
            Box::new(acc.merge_by(item.into_iter(), |a: &(String, u32), b: &(String, u32)| {
                a.0.cmp(&b.0)
            }))
        });

        let grouped = merged_counts.group_by(|&(a, _)| { a.clone() });

        let aggregated = grouped.map( |(k, v)| {
            (k, v.iter().map(|&(ref a, b)|{ 
                b 
            }).sum::<u32>())
        });
        print_counts(aggregated);
    }
}
```

So my question is basically am I doing something wrong to make the iterator expressions so baroque? It's an unfair comparison but if I look at an example in Scala using Spark, this becomes just:
```
val file = sc.textFile("my-input-file")
val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("my-output-file")
```

I've seen a lot of ink spilled over the ergonomics of APIs but this doesn't feel terribly ergonomic. When I use `Vec<u32>`s as, like all the tests and examples, sure, it generally works very well.

Anyway, thanks for any help here and if you have examples of how to golf the code, I think it would be instructive not only for me but others who come to the forum to read up on how people with experience would tune up beginners' code.

#2

The compilation error I see is (I hope it’s the one you see… general reminder: when asking questions that include compilation failures, include the compiler output :slight_smile: ):

src/main.rs:82:47: 82:54 error: cannot move out of borrowed content
src/main.rs:82         let grouped = merged_counts.group_by(|&(a, _)| { a.clone() });
                                                             ^~~~~~~
src/main.rs:82:49: 82:50 note: attempting to move value to here
src/main.rs:82         let grouped = merged_counts.group_by(|&(a, _)| { a.clone() });
                                                               ^
src/main.rs:82:49: 82:50 help: to prevent the move, use `ref a` or `ref mut a` to capture value by reference

which can be solved as the ‘help’ suggests: |&(ref a, _)| { a.clone() };.

However, in this case, I think forgoing iterators for the “complicated” merge is easiest:

// ...
println!("shuffle/reduce.");
let mut counts_iter = counts.into_iter();

let mut counts = counts_iter.next().unwrap();
for more_counts in counts_iter {
    for (k, count) in more_counts {
        *counts.entry(k).or_insert(0) += count;
    }
}

print_counts(counts.into_iter());

One could even write an iterator consumer similar to reduceByKey. I think this is the missing piece: there’s a moderately complicated (in terms of implementing as lazy iterators) operation happening here, and neither std nor itertools provide exactly the right piece to slot in, while Spark does.

fn reduce_by_key<I, K, T, F>(it: I, mut f: F) -> BTreeMap<K, T>
    where I: Iterator<Item = (K, T)>, F: FnMut(&mut T, T), K: Ord
{
    use std::collections::btree_map;
    let mut map = BTreeMap::new();
    for (k, t) in it {
        match map.entry(k) {
            btree_map::Entry::Occupied(o) => f(o.into_mut(), t),
            btree_map::Entry::Vacant(v) => { v.insert(t); }
        }
    }
    map
}

Both of these have slightly different semantics original iterator version (they aren’t lazy), but the latter allows the core of the main code to become:

let counts = guards.into_iter().flat_map(|x| x.join());
print_counts(reduce_by_key(counts, |x, y| *x += y).into_iter());

On that note, one could “golf” the thread creation too:

let guards = read_dir(path).unwrap().map(|entry| {
    let p = entry.unwrap();
    thread::scoped(move || process_file(&p.path()))
}).collect::<Vec<_>>();

#3

general reminder: when asking questions that include compilation failures,
include the compiler output ):

That was indeed the error I saw. Sorry for not including it!

which can be solved as the ‘help’ suggests: |&(ref a, _)| { a.clone() };.

Ha, I had been through so many minor edits I had no idea I was so close. I thought I tried using ref with the clone call in the closure, but obviously not. I took this solution and also replaced clone with to_owned so I don’t need to copy. The difference was marginal since the runtime is dominated by reading my sample file (512MB in 64MB chunks).

Your resulting reduce_by_key is really pleasant since the calling api becomes very elegant. And it’s very easy to read. But the performance takes a hit. And one of the main points of my post was to highlight some of the difficulties in programming through the iterator APIs. Now, part of this is because I am trying to do something for which there is/was no critical iterator adapter. And part of it is because I find the ownership gets really confusing due to the dearth of examples I’ve seen when using iterators over tuples and strings and tuples of strings.

How would you set about joining multiple containers lazily and then reducing their values? Or does this call for an RDD type API which may or may not be based on the iterator interfaces.


#4

Since I’m stubborn, I’ve tried to get your original code to compile with .merge_by and .group_by.

The strategy is this: use the counts vector as the owner of all the mappings and produce the merged collection only as references into this owner. This allows merging and grouping to run without cloning any string keys.

We already use boxed iterators unfortunately, since that’s needed to unify the types across the iterator fold operation.

This is the new code for the section I changed. It compiles and seems to work correctly:

let counts = guards.into_iter().map(|x| x.join()).collect::<Vec<CountMap>>();
println!("shuffle/reduce.");
let mut counts_iter = counts.iter();
let zero: Box<Iterator<Item=(&String, &u32)>> = Box::new(None.into_iter());
let merged_counts: Box<Iterator<Item=(&String, &u32)>> = counts_iter.fold(zero, |acc, item| {
    Box::new(acc.merge_by(item.iter(), |a, b| a.0.cmp(b.0)))
});

let grouped = merged_counts.group_by(|t| t.0);

let aggregated = grouped.map(|(k, v)| {
    (k, v.iter().map(|&(_, b)| b ).sum::<u32>())
});
print_counts(aggregated);
  • I used a simpler way to create the zero iterator for fold.
  • Important part is that we never move out from counts, it is the root owner of all the data in the iterators that follow
  • References are Copy, so the closures are simple to write.
  • You could change the coalescion to use references for the string keys and take the u32 values by value.