Variable moved due to use in generator even when it is in Arc

I'd like to iterate a hashmap and process the keys and values separately using join_all like the following codes:

use std::{collections::HashMap, sync::Arc};
use futures_util::future::join_all;

async fn function(db: &str, table: &str, user: &str) {
    println!("{}.{} for user: {}", db, table, user);
}

#[tokio::main]
async fn main() {
    let mut maps = HashMap::with_capacity(50);
    let db_1 = "db_1".to_owned();
    let db_2 = "db_2".to_owned();
    let tables_1 = vec!["a", "b"];
    let tables_2 = vec!["c", "d"];
    maps.entry(db_1).or_insert(tables_1);
    maps.entry(db_2).or_insert(tables_2);

    let user = Arc::new("user");

    let tasks = maps
        .into_iter()
        .map(move |(db, tables)| async move {
            let subtasks = tables.into_iter().map(move |table| {
                let db = db.clone();
                let user = user.clone();
                async move {
                    function(db.as_str(), table, &user).await;
                }
            });
            join_all(subtasks).await;
        });
        join_all(tasks).await;
}

but the compiler will complain variable moved due to use in generator when I tried to clone the Arc user:

Compiling playground v0.0.1 (/playground)
error[E0507]: cannot move out of `user`, a captured variable in an `FnMut` closure
  --> src/main.rs:22:34
   |
18 |       let user = Arc::new("user");
   |           ---- captured outer variable
...
22 |           .map(move |(db, tables)| async move {
   |  ______________-------------------_^
   | |              |
   | |              captured by this `FnMut` closure
23 | |             let subtasks = tables.into_iter().map(move |table| {
24 | |                 let db = db.clone();
25 | |                 let user = user.clone();
   | |                            ----
   | |                            |
   | |                            variable moved due to use in generator
   | |                            move occurs because `user` has type `Arc<&str>`, which does not implement the `Copy` trait
...  |
30 | |             join_all(subtasks).await;
31 | |         });
   | |_________^ `user` is moved here

For more information about this error, try `rustc --explain E0507`.
error: could not compile `playground` (bin "playground") due to previous error

which confused me because I thought it's the standard way to use an Arc to wrap the variable when I need to consume it for multiple times in closures.

can somebody help me out of this error? thank you.!

You need to clone the Arc outside the closure, and move the clone. A move closure moves everything you capture, including the original Arc instance. So cloning it inside the closure is useless: it's already moved into the closure.

3 Likes
    let tasks = maps.into_iter().map({
        |(db, tables)| {
            // capture user by reference to call the closure multiple times
            let user = user.clone(); 
            async move { // move the clone of user
                let subtasks = tables.into_iter().map(|table| {
                    // capture user by reference to call the closure multiple times
                    let db = db.clone();
                    let user = user.clone();
                    async move { // move the clone of user
                        function(db.as_str(), table, &user).await;
                    }
                });
                join_all(subtasks).await;
            }
        }
    });

Rust Playground

The key is to realize Iterator::map takes a F: FnMut(Self::Item) -> B, so don't capture by value since the closure can be called multiple times while the ownership of captured value only exists once.

3 Likes

Do you need the second user.clone()?

Alternatively, I'd prefer using for loops rather than Iterator methods. That way you reduce the boilerplate of all the closures and get rid of one user.clone() call:

use futures_util::future::join_all;
use std::{collections::HashMap, sync::Arc};

async fn function(db: &str, table: &str, user: &str) {
    println!("{}.{} for user: {}", db, table, user);
}

#[tokio::main]
async fn main() {
    let mut maps = HashMap::with_capacity(50);
    let db_1 = "db_1".to_owned();
    let db_2 = "db_2".to_owned();
    let tables_1 = vec!["a", "b"];
    let tables_2 = vec!["c", "d"];
    maps.entry(db_1).or_insert(tables_1);
    maps.entry(db_2).or_insert(tables_2);

    let user = Arc::new("user");

    let mut tasks = Vec::new();
    
    for (db, tables) in maps {
        for table in tables {
            let db = db.clone();
            let user = user.clone();
            
            tasks.push(async move {
                function(db.as_str(), table, &user).await;
            });
        }    
    }
    
    join_all(tasks).await;
}

Playground.

1 Like

Yes, because otherwise our user clone gets moved into the first task only.

Yes but since you only need a reference you could also do this instead

#[tokio::main]
async fn main() {
    let mut maps = HashMap::with_capacity(50);
    let db_1 = "db_1".to_owned();
    let db_2 = "db_2".to_owned();
    let tables_1 = vec!["a", "b"];
    let tables_2 = vec!["c", "d"];
    maps.entry(db_1).or_insert(tables_1);
    maps.entry(db_2).or_insert(tables_2);

    let user = Arc::new("user");

    let tasks = maps.into_iter().map({
        |(db, tables)| {
            // capture user by reference to call the closure multiple times
            let user = user.clone(); 
            async move { // move the clone of user
                let subtasks = tables.into_iter().map(|table| {
                    // capture user by reference to call the closure multiple times
                    let db = db.clone();
                    let user = &user;
                    async move { // move the clone of user
                        function(db.as_str(), table, user).await;
                    }
                });
                join_all(subtasks).await;
            }
        }
    });
    join_all(tasks).await;
}

Edit

Appararently the whole Arc stick isn't necessary here, references suffice.

you're right on this, the reason I'd like to use join_all is I want to get several groups of tasks and process each group fo tasks as a whole.

you're right on this case, I gave a variable user of type &str here just for simplification, in my real codes it's a sqlx::Pool<MySQL> instance, and I think I have to use an Arc in that case, am I right?

Yeah it's possible that this only works because we have &'static str here because of the string literals. Not sure though^^ I don't really know how lifetimes work with async and multithreaded.

It works for this though

use rand::random;
use futures_util::future::join_all;
use std::{collections::HashMap, sync::Arc};

async fn function(db: &str, table: &str, user: &str) {
    println!("{}.{} for user: {}", db, table, user);
}

#[tokio::main]
async fn main() {
    let mut maps = HashMap::with_capacity(50);
    let db_1 = "db_1".to_owned();
    let db_2 = "db_2".to_owned();
    let tables_1 = vec!["a", "b"];
    let tables_2 = vec!["c", "d"];
    maps.entry(db_1).or_insert(tables_1);
    maps.entry(db_2).or_insert(tables_2);

    let user: String = random::<char>().to_string();

    let tasks = maps.into_iter().map({
        |(db, tables)| {
            // capture user by reference to call the closure multiple times
            let user = &user; 
            async move { // move the clone of user
                let subtasks = tables.into_iter().map(|table| {
                    // capture user by reference to call the closure multiple times
                    let db = db.clone();
                    let user = &user;
                    async move { // move the clone of user
                        function(db.as_str(), table, user).await;
                    }
                });
                join_all(subtasks).await;
            }
        }
    });
    join_all(tasks).await;
}

Playground

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.