Tokio sharing data across threads

Not sure if it's appropriate to ask crate-specific questions here, but I have a question about Tokio.
I want to learn how threads can modify data structures created in the main thread.
Here's a simple example that doesn't work because it says dog_map does not live long enough.
I imagine there is an easy fix that I'm not seeing.

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type DogMap = Arc<Mutex<HashMap<String, String>>>; // name to breed

async fn add_nelson_dogs(dog_map: &DogMap) {
    let mut map = dog_map.lock().unwrap();
    map.insert("Maisey".to_string(), "Treeing Walker Coonhound".to_string());
    map.insert(
        "Oscar".to_string(),
        "German Shorthaired Pointer".to_string(),
    );
    // Do I need to explicitly unlock here?
}

async fn add_volkmann_dogs(dog_map: &DogMap) {
    let mut map = dog_map.lock().unwrap();
    map.insert("Comet".to_string(), "Whippet".to_string());
    // Do I need to explicitly unlock here?
}

#[tokio::main]
async fn main() {
    let map: HashMap<String, String> = HashMap::new();
    let wrapped_map = Arc::new(Mutex::new(map));
    tokio::spawn(add_nelson_dogs(&wrapped_map)).await;
    tokio::spawn(add_volkmann_dogs(&wrapped_map)).await;
    let map = wrapped_map.lock().unwrap();
    println!("{:?}", map);
}

Instead of passing an &DogMap, just take an DogMap without the ampersand and clone it for each call.

Check out this page: Shared state | Tokio

Wouldn't that be really inefficient if the HashMap gets large? I'm assuming that cloning a HashMap also clones all the keys and values it holds. Is that correct? I'm really after learning how to share data between threads without making copies.

No, that's the purpose of an Arc. When you clone an Arc, the value inside it is not cloned.

Of course, if it was not wrapped in an Arc, then it would indeed be expensive.

1 Like

Ah, that makes sense. So now I have the following for my main method and it works. Does this look like the best approach for what I'm doing? It seems I have to explicitly ignore the value returned on the spawn lines ... or maybe I should be doing something with those.

#[tokio::main]
async fn main() {
    let map: HashMap<String, String> = HashMap::new();
    let dog_map = Arc::new(Mutex::new(map));
    // The clone calls here clone the Arc, not the HashMap inside it.
    let _ = tokio::spawn(add_nelson_dogs(dog_map.clone())).await;
    let _ = tokio::spawn(add_volkmann_dogs(dog_map.clone())).await;
    let map = dog_map.lock().unwrap();
    println!("dog_map contains {:?}", map);
}

tokio::spawn(...).await would not what you want. It's same as std::thread::spawn(...).join() but async.

Basically you hire a worker for some task, doing nothing but waiting the worker to done its job, and process remaining. No concurrency or parallelism here, only additional cost for hiring. If you want to run two tasks in parallel, store the JoinHandle and .await it later. You may follow the example of the std::thread::spawn(). It's synchronous but the pattern is same.

Are you suggesting something like this?

    let futures = vec![
        tokio::spawn(add_nelson_dogs(dog_map.clone())),
        tokio::spawn(add_volkmann_dogs(dog_map.clone())),
    ];
    join_all(futures).await;

This is fine:

let handle1 = tokio::spawn(add_nelson_dogs(dog_map.clone()));
let handle2 = tokio::spawn(add_volkmann_dogs(dog_map.clone()));
handle1.await.unwrap();
handle2.await.unwrap();

I strongly recommend that you stay far away from join_all. There are many problems with it, including it being slow and you forgetting to handle errors, like you did in your example, because it doesn't give you a warning when you forget to handle them.

2 Likes

What would you recommend for a case where there are a large number of threads doing work and you need to wait for all of them to complete before doing something else? Is there an alternative to join_all?

If you have spawned them with tokio::spawn, just use a loop.

for handle in handles {
    handle.await.unwrap();
}

Otherwise if the futures you would put in join_all were not join handles, use buffered or buffer_unordered from the futures crate. E.g. to call an async function on every item in a vector:

let vec: Vec<_> = futures::stream::iter(items)
    .map(|item| some_async_method(item))
    .buffered(20)
    .collect()
    .await;

Here the 20 is an upper limit on the number of concurrent operations. You can just use the length of the vector for no limit.

For even more control on how they are executed, you can use FuturesUnordered.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.