Count letter frequency in parallel

I'm working on a problem where given a slice of strings, and a number of workers, I need to spawn as many threads to count the frequency of letters. I came up with the following code:

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{self, SyncSender, Receiver};
use std::thread;
use chrono::offset::Local;

pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
    // https://doc.rust-lang.org/std/sync/mpsc/fn.sync_channel.html
    let (tx1, rx1) = mpsc::sync_channel(worker_count);
    let (tx2, rx2) = mpsc::sync_channel(input.len());
    let rx2 = Arc::new(Mutex::new(rx2));

    for i in 0..worker_count {
        worker(i, rx2.clone(), tx1.clone());
    }

    for line in input {
        tx2.send(line.to_string()).unwrap();
    }

    let mut freq = HashMap::new();
    for _ in 0..input.len() {
        let map = rx1.recv().unwrap();
        for (c, i) in map {
            *freq.entry(c).or_default() += i;
        }
    }
    freq
}

fn worker(id: usize, rx: Arc<Mutex<Receiver<String>>>, tx: SyncSender<HashMap<char, usize>>) {
    thread::spawn(move || loop {
        let mut hm = HashMap::new(); 
        match rx.lock() {
            Ok(rx) => {
                match rx.try_recv() {
                    Ok(s) => {
                        println!("[{:?}] worker-{} received: {}", Local::now(), id, s);
                        for c in s.as_str().chars() {
                            if c.is_ascii_alphabetic() {
                                *hm.entry(c.to_ascii_lowercase()).or_default() += 1;
                            }
                        }
                    },
                    Err(_) => {},
                }
            },
            Err(_) => {},
        };
        match tx.send(hm) {
            Err(e) => eprintln!("[{:?}] worker-{} failed to send, {:?}", Local::now(), id, e),
            _ => {},
        }
    });
}

But called with input &["aA"], 4 the program fails with a not-so-helpful error message:

[2022-07-03T19:20:27.672701-07:00] Worker 2 received: aA
[2022-07-03T19:20:27.673068-07:00] Worker 2 failed to send, SendError { .. }

Why is it failing?

I was able to get the details of the error by changing from debug to display formatting.

worker-1 failed to send, sending on a closed channel

This is because there are more workers than input lines. The error is fixed by changing mpsc::sync_channel(worker_count) to mpsc::sync_channel(cmp::min(input.len(), worker_count)). But I still don't understand how the channel was getting closed causing the error.

I'd say you should replace the second channel with an MPMC (multi-producer, multi-consumer) channel from crossbeam, sharing the sender of an MPMC with an arc-mutex is usually counter-productive.

The second channel is single-producer, multi-consumer. I'm sharing the receiver.
The following code works, however, there is one thing that's not clear to me.

// Create one less worker, so that we don't end up cloning more than needed.
// Without this, the program hangs.

use std::collections::HashMap;
use std::sync::mpsc::{self, Receiver, SyncSender, TryRecvError};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

// We create two channels, one for sending data to the workers, and one for
// receiving from them.
// Since there is no thread-safe multiple receiver construct, the receiver
// passed to the threads is protected by a mutex.
pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
    // https://doc.rust-lang.org/std/sync/mpsc/fn.sync_channel.html
    let (tx1, rx1) = mpsc::sync_channel(worker_count);
    let (tx2, rx2) = mpsc::sync_channel(input.len());
    let rx2 = Arc::new(Mutex::new(rx2));

    let mut workers = Vec::new();
    // Create one less worker, so that we don't end up cloning more than needed.
    // Without this, the program hangs.
    for i in 0..worker_count - 1 {
        workers.push(worker(i, Arc::clone(&rx2), tx1.clone()));
    }
    // The final worker gets the original references
    workers.push(worker(worker_count - 1, rx2, tx1));

    // Send all lines and close the channel
    for line in input {
        tx2.send(line.to_string())
            .unwrap_or_else(|_| panic!("[main] failed to send"));
    }
    drop(tx2);

    let mut freq = HashMap::new();
    // Loop until channel is disconnected
    for map in rx1 {
        for (c, i) in map {
            *freq.entry(c).or_default() += i;
        }
    }
    // Make sure there are no orphan threads
    for w in workers {
        let _ = w.join();
    }
    freq
}

fn worker(
    id: usize,
    rx: Arc<Mutex<Receiver<String>>>,
    tx: SyncSender<HashMap<char, usize>>,
) -> JoinHandle<()> {
    // At each iteration, try to acquire a lock on the receiver and read
    // a message.
    // If the channel is empty, release the lock.
    thread::spawn(move || loop {
        let mut hm = HashMap::new();
        // The shared state can only be accessed once the lock is held.
        // Our non-atomic increment is safe because we're the only thread
        // which can access the shared state when the lock is held.
        //
        // We unwrap() the return value to assert that we don't
        // expect threads to ever fail while holding the lock.
        let guard = rx
            .lock()
            .unwrap_or_else(|_| panic!("worker-[{}] failed to acquire lock", id));
        // This method will never block the caller in order to wait for
        // data to become available.
        match guard.try_recv() {
            Ok(line) => {
                for c in line.as_str().chars() {
                    if c.is_alphabetic() {
                        for x in c.to_lowercase() {
                            *hm.entry(x).or_default() += 1;
                        }
                    }
                }
                tx.send(hm)
                    .unwrap_or_else(|_| panic!("worker-[{}] failed to send", id));
            }
            Err(e) if e == TryRecvError::Disconnected => break,
            _ => {}
        }
        drop(guard);
    })
}

This isn't responsive to your question, and please forgive me in advance for that (I'm not enough of a Rust expert yet) but .... have you tried rayon ? Seriously, it seems like a good utility for parallelizing code that has a natural partitionability.

I'm using it to build sparse matrices (gynormous ones) in parallel, and have been pleasantly surprised at how straightforward it is. You wrestle with types and lifetimes for a while, but once you're done with that, the parallel code and the serial code aren't so different.

Just a thought.

I've not, because for this exercise, I want to write the code myself, not have the library do my job. I'm not a Rust expert either, but from what I saw, it will make this exercise almost trivial.

@RedDocMD Using crossbeam works nicely, and cuts down the code too, but still leaves the question, why does the program hang if I clone the sender/receiver one more than needed. Can you help me understand that?

// Create one less worker, so that we don't end up cloning more than needed.
// Without this, the program hangs.
for i in 0..worker_count - 1 {
    workers.push(worker(i, Arc::clone(&rx2), tx1.clone()));
}
// The final worker gets the original references
workers.push(worker(worker_count - 1, rx2, tx1));

Ah, laudable. Someday soon I hope to get there myself! And yeah, you're right in your assessment. I wish I knew enough about the various concurrency and thread-comms operations to be able to comment, and hopefully someday I will. But for now, I'm trying to actually get work done, and that precludes learning stuff when an adequate library exists. Ah, well. Good hunting!

I think I figured it out myself. For channel disconnection to work, every reference and clone must go out of scope. Another alternative is to create one more clone and close the originals immediately.

For now I would recommend avoiding using standard library channels - the problem is that they are pretty much broken, see issues such as Panic in Receiver::recv() · Issue #39364 · rust-lang/rust · GitHub. Use crossbeam-channel instead (or for this particular usecase, rayon).

In addition to crossbeam-channel, another option for an MPMC channel is flume:

COMPLETELY off topic - because any bugs are a good topic to reconcile. But, take care with parallelizing primitive operations like this. Two CPUs are not faster than 1 if the bottleneck is DRAM or L3 cache. The hash table in your dispatched work is maybe just enough that you can eek out some parallelism (if the string sizes are large enough), but it has to be SO much so, that the overhead of the L2 cache snooping (flushing the dispatching threads' L2 cache entries (src strings) and flushing the hash-entries back to dispatcher when done) and the corresponding OS calls for mutex/signaling isn't completely overcome.

I'm personally tortured by writing GPU code that's ultimately slower than CPU code because the transfer time to/from PCI bus is longer than the execution time. The same is true of simple rayon .par_iter() functions with too weak of a mapping kernel. You burn 3x cpu and only get 0.75 wall-clock time. When you have a complex execution pipeline this winds up slowing down the overall FPS/QPS or whatever metric you're worried about.

That said, random-access memory lookups for directed graphs (similar to your hashmap) are good multi-threaded candidates - though your hashmap should live within the L1 cache and thus, not likely be of GREAT advantage (eg each memory lookup only takes 2 clocks). Massive multi-GB caches would be a great example of multi-threaded concurrency (even with hyper-threading) as all CPU-threads are waiting 50 clocks for the next DRAM load - but waiting in parallel.

SHAs would also be good parallelizing candidates, because each byte of each input string requires hundreds of instructions - all living in separate L1s (after the initial copy-over).

2 Likes