Counting characters in parallel - lifetime error with closure and input data

Hi all, I'm learning Rust and currently writing a function to count character frequencies from an array of string slices in parallel.

use std::collections::HashMap;
use std::thread::{self, JoinHandle};
pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
    let batch_size = input.len() / worker_count;

    fn count_chars(input: Vec<&str>) -> HashMap<char, usize> {
        let mut counter = HashMap::<char, usize>::new();
        for s in input {
            for c in s.chars() {
                counter
                    .entry(c)
                    .and_modify(|count| *count += 1)
                    .or_insert(1);
            }
        }

        counter
    }

    // 1. split the input into worker_count batch-sizes of m
    let mut batched_inputs: Vec<Vec<&str>> = Vec::new();
    for i in 0..worker_count {
        batched_inputs.push(input[i * batch_size..i * batch_size + batch_size].to_vec());
    }

    // 2. spawn threads and pass in the copies of each slice to the workers
    let mut workers: Vec<JoinHandle<_>> = Vec::new();
    for batch in batched_inputs {
        workers.push(thread::spawn(move || count_chars(batch)));
    }

    // 3. each worker returns a hashmap
    let mut results = Vec::new();
    for handle in workers {
        results.push(handle.join().unwrap());
    }

    // 4. the main thread consolidates all the hashmaps into a single hashmap
    results
        .iter()
        .fold(HashMap::new(), |mut acc_counter, curr_counter| {
            for (k, v) in curr_counter {
                acc_counter
                    .entry(*k)
                    .and_modify(|counter| *counter += *v)
                    .or_insert(*v);
            }

            acc_counter
        })
}

However, I'm getting a compiler warning about lifetimes and I don't quite get why that's the case here. Appreciate an explanation and how I can fix the issue.

I'm also aware the code isn't optimal (e.g. not needing to store batched_inputs) but I just want to be explicit for my own learning. Thanks again.

Compiler error:

workers.push(thread::spawn(move || count_chars(batch)));
   |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |                      |
   |                      `input` escapes the function body here
   |                      argument requires that `'1` must outlive `'static`

I've tried removing move but doesn't work either.

the &str has an (elided) lifetime in its type. try the scoped threading api, see Scope and ScopedJoinHandle.

1 Like

Updated to:

let mut workers: Vec<ScopedJoinHandle<_>> = Vec::new();
for batch in batched_inputs {
    workers.push(thread::scope(|s| s.spawn(move || count_chars(batch))));
}

Still getting a lifetime error, albeit a slightly different one:

workers.push(thread::scope(|s| s.spawn(move || count_chars(batch))));
                         -- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that `'1` must outlive `'2`

What lifetimes are '1 and '2 referring to?

Based on the docs, my understanding is spawning a thread from the scope API ensures that the referenced inputs into the thread are guaranteed to live as long as those referenced on the outside (in this case, the main thread)?

the lifetime bound is a constraint, not a guarantee, which means your usage to the API must meet the requirement.

regular (non-scoped) thread API require the thread procedure be 'static because once it's spawned, the spawning thread has no control over it whatsoever. scoped threads can have references that is not 'static, because the spawning thread will implicitly wait (i.e. join) all the child thread when on scope exit.

so if you need to do anything with non-'static references, you must do it inside the scope callback, this includes joining of the worker thread to collect the results.

so your example code can be rewritten using the scoped threading API like this:

    // 1. split the input into worker_count batch-sizes of m
    let mut batched_inputs: Vec<Vec<&str>> = Vec::new();
    for i in 0..worker_count {
        batched_inputs.push(input[i * batch_size..i * batch_size + batch_size].to_vec());
    }

    let results = thread::scope(|s| {
        // 2. spawn threads and pass in the copies of each slice to the workers
        let workers = batched_inputs
            .into_iter()
            .map(|batch| s.spawn(|| count_chars(batch)))
            .collect::<Vec<_>>();
        // 3. each worker returns a hashmap
        workers
            .into_iter()
            .map(|worker| worker.join().unwrap())
            .collect::<Vec<_>>()
    });
    // 4. the main thread consolidates all the hashmaps into a single hashmap
    results
        .iter()
        .fold(HashMap::new(), |mut acc_counter, curr_counter| {
            for (k, v) in curr_counter {
                acc_counter
                    .entry(*k)
                    .and_modify(|counter| *counter += *v)
                    .or_insert(*v);
            }

            acc_counter
        })

you need to collect into an intermediate Vec<ScopedJoinHandle> to spawn the worker threads, because Iterators are lazy.

but collecting into an results: Vec<HashMap<>> is unnecessary and can be fused into the final folding step.

so the code can be simplified more like this:


pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
    let batch_size = input.len() / worker_count;
    fn count_chars(input: &[&str]) -> HashMap<char, usize> {
        input
            .iter()
            .flat_map(|s| s.chars())
            .fold(HashMap::new(), |mut counter, c| {
                counter
                    .entry(c)
                    .and_modify(|count| *count += 1)
                    .or_insert(1);
                counter
            })
        }
    thread::scope(|s| {
        // 1. split the input into worker_count batch-sizes of m
        // 2. spawn threads and pass in the copies of each slice to the workers
        // the consecutive two `map()`s can be combined to one, but I kept here
        // to resemble the original code structure
        // also, ScopedJoinHandle can be inferred
        let workers = (0..worker_count)
            .map(|i| &input[i * batch_size..i * batch_size + batch_size])
            .map(|batch| s.spawn(|| count_chars(batch)))
            .collect::<Vec<ScopedJoinHandle<HashMap<char, usize>>>>();
        // 3. each worker returns a hashmap
        // 4. the main thread consolidates all the hashmaps into a single hashmap
        workers
            .into_iter()
            .flat_map(|worker| worker.join().unwrap())
            .fold(HashMap::new(), |mut total, (k, v)| {
                total.entry(k).and_modify(|count| *count += v).or_insert(v);
                total
            })
    })
}
1 Like

Thanks a lot @nerditation, that works great!

I'm still not sure I understand why my original code did not work. I'm trying to connect what I think I understand conceptually, with the original compiler error (input` escapes the function body here argument requires that `'1` must outlive 'static). Can you clarify my understanding is correct here:

  • thread::spawn requires its closure inputs (any variables being used within the closure) to have a 'static lifetime. I had workers.push(thread::spawn(move || count_chars(batch))); where the compiler could not be certain the lifetime of batch would last longer than the thread. (But wouldn't this have been addressed with the move command?)
  • With thread::scope, batched_inputs is consumed within the created scope, and the compiler is satisfied that the threads are alive shorter than the span of batched_inputs as, as you mentioned, the main thread implicitly waits for the scope to end.

scoped threads can have references that is not 'static

By this, you mean that scoped threads can contain references within its closure that are not 'static right?

move keyword moves the variable batch into the closure, but the type of batch is Vec<&'a str>, where 'a is some lifetime decided by the caller of frequency. if you spell out the elided lifetimes, the type signature of the function frequency looks like this:

fn frequence<'a, 'b>(input: &'b [&'a str], worker_count: usize) -> HashMap<char, usize>;

this is a correct way to interpret it.

the precise description should be, the type of the closure (which is a anonymous opaque type the compiler synthesized) doesn't have a 'static bound.

to explain the difference between regular threads and scoped threads, you first need to understand how closure types are synthesized by the compiler. the type of a closure can be thought of a struct with captured variables as it's fields and implements the Fn family of traits, that is, FnOnce, FnMut and Fn.

by default, variables from the enclosing lexical scope are captured in the least demanding way. for example, if only methods with a &self receiver are called, the variable will be capture by immutable reference; if some methods with &mut self are called, the variable will be captured by mutable reference; if a methods with self receiver is called, the variable is captured by value (i.e. moved into the closure). here's an example:

struct Foo {}
impl struct Foo {
    fn by_ref(&self) {}
    fn by_mut(&mut self) {}
    fn by_value(self) {}
}

let a = Foo{};
let mut b = Foo{};
let c = Foo{};

let f = || {
    a.by_ref();
    b.by_mut();
    c.by_value();
};

// conceptually desugared like:
// following is pseudo code to explain the concept, not actual valid rust code
struct ClosureType<'a> {
    a: &'a Foo,
    b: &'a mut Foo,
    c: Foo,
}
impl<'a> FnOnce<()> for ClosureType<'a> {
    type Output = ();
    fn call_once(mut self, args: ()) -> Self::Output {
        self.a.by_ref();
        self.b.by_mut();
        self.c.by_value();
    }
}
let f = ClosureType {
    a: &a,
    b: &mut b,
    c,
};

by adding a move keyword to the closure definition, all variables are captured by value, like this:

let f = move || {
    a.by_ref();
};

// conceptually desugared like:
struct ClosureType {
    a: Foo,
}
impl FnOnce<()> for ClosureType {
    type Output = ();
    fn call_once(self) -> Self::Output {
        self.a.by_ref();
    }
}
let f = ClosureType { a };

the move keyword just means the variable are captured by value, it doesn't say anything about what the variable type could be. in the above example, if the Foo type itself contains a lifetime generic parameter, then the closure type cannot be 'static either.

struct Foo<'b> {
    msg: &'b str,
}
let foo: Foo = todo!();
let f = move || {
    println!("{}", foo.msg);
};

// with `move`, what you get is something like:
struct ClosureType<'b> {
    foo: Foo<'b>,
}
let f = ClosureType {
    foo,
};

notice foo is captured by value, it's Foo, not &Foo, but the closure still is not 'static.

the only difference between std::thread::spawn() and std::thread::Scope::spawn() (besides JoinHandle vs ScopedJoinHandle) is the former requires the parameter type be 'static, while the latter requires to be 'scope. the reason can be illustrated in the following snippet:

fn bar() -> JoinHandle<()> {
    let foo = Foo {};
    // if we didn't require the closure be 'static, the following were allowed
    thread::spawn(|| {
        thread::sleep(Duration::from_secs(1));
        foo.by_ref();
    }
    // here the `bar()` function returns, so variable `foo` goes out of scope
    // but the thread we just spawned has not finished yet, and it captured
    // a reference to the variable `foo`, which is dropped already
}
fn baz() {
    let foo = Foo {};
    // this time, we use scoped thread
    thread::scope(|s| {
        // this api requires the closure type contains lifetime no shorter than `&'scope s`
        s.spawn(|| {
            thread::sleep(Duration::from_secs(1));
            foo.by_ref();
        );
        // because when `s` gets dropped, all spawned thread will be implicitly joined
        // which means the lifetime of `s` will not end before all threads finished,
        // in other words, as long as `s` is live, the captured references are guaranteed
        // to be not dropped, and the thread can safely access them,
    });
    // and here `foo` is  can be safely dropped.
    // because the `Scope` object only exist inside the scope callback.
}
5 Likes

This is invaluable, thank you @nerditation!

I have one more follow-up question though. If the only difference between the two methods of spawning threads is thread::spawn requires its input closure to have a 'static lifetime, then the code below should not work. But I ran it and it works fine (when it shouldn't based on the explanation above?)

    struct Foo<'a> {
        msg: &'a str,
    }
    
    impl<'a> Foo<'a> {
        fn by_ref(&self) {}
    }

    fn bar() {
        let foo = Foo {
            msg: "bar hello world!",
        };
        thread::spawn(move || {
            thread::sleep(Duration::from_secs(1));
            foo.by_ref();
        });

        // foo.by_ref() // this would not compile
    }

    bar();

As you explained above, move causes the closure to capture foo by value. but Foo's attribute contains a generic lifetime parameter ('a) which is non-'static. But you mentioned that thread::spawn requires its closure parameter type to be static.

foo in the example is a Foo<'static>, which satisfies the 'static bound.[1] Make this change to see it fail.

     fn bar() {
+        let local = "bar hello world!".to_string();
         let foo = Foo {
-            msg: "bar hello world!",
+            msg: &*local,
         };

  1. "literal strs" are &'static strs â†Šī¸Ž

1 Like