Multithreading in rust

#[post("/users/add-role", format = "json", data = "<role_info>")]
pub fn create_role(role_info: Json<UserInputRole>) -> Value {

    let handles: Vec<_> = (0..10).map(|i| {
        thread::spawn(move || {
            println!("Thread {} started", i);
                services::users::add_role(&role_info.role_name);
            println!("Thread {} finished", i);
        })
    }).collect();                           

    for handle in handles {
        handle.join().unwrap();
    }

    return Null;
}

Following is my POST API where I am implementing multi threading but I am getting error as below.

error[E0507]: cannot move out of `role_info`, a captured variable in an `FnMut` closure
  --> src/routes/users.rs:50:23
   |
47 | pub fn create_role(role_info: Json<UserInputRole>) -> Value {
   |                    --------- captured outer variable
48 |
49 |     let handles: Vec<_> = (0..10).map(|i| {
   |                                       --- captured by this `FnMut` closure
50 |         thread::spawn(move || {
   |                       ^^^^^^^ `role_info` is moved here
51 |             println!("Thread {} started", i);
52 |                 services::users::add_role(&role_info.role_name);
   |                                            ---------
   |                                            |
   |                                            variable moved due to use in closure
   |                                            move occurs because `role_info` has type `rocket::serde::json::Json<UserInputRole>`, which does not implement the `Copy` trait

what changes are required ?

Feel free to take a look at this thread to make your question more readable.

The compiler moves role_info instead of a reference to role_info.role_name here. That means the first thread takes ownership of role_info, leaving the other threads with nothing to reference, causing your error. You must explicitly tell the compiler that you only wish to move a reference (which implements Copy so moving it will move a copy) to role_info.role_name by binding it to a variable:

#[post("/users/add-role", format = "json", data = "<role_info>")]
pub fn create_role(role_info: Json<UserInputRole>) -> Value {
    // bind &role_info.role_name to a variable which is then moved
    // into each thread's closure 
    let role_name = &role_info.role_name;

    thread::scope(|s| {
        for i in 0..10 {
            s.spawn(move || {
                println!("Thread {} started", i);
                services::users::add_role(role_name);
                println!("Thread {} finished", i);
            });
        }
    });

    return Null;
}
error[E0597]: `role_info` does not live long enough
   --> src/routes/users.rs:131:22
    |
128 |   pub fn create_role(role_info: Json<UserInputRole>) -> Value {
    |                      --------- binding `role_info` declared here
...
131 |       let role_name = &role_info.role_name;
    |                        ^^^^^^^^^ borrowed value does not live long enough
...
134 | /         thread::spawn(move || {
135 | |             println!("Thread {} started", i);
136 | |                 services::users::add_role(role_name);
137 | |             println!("Thread {} finished", i);
138 | |         })
    | |__________- argument requires that `role_info` is borrowed for `'static`
...
146 |   }
    |   - `role_info` dropped here while still borrowed

I am getting error as above.

Other code for reference;

pub fn add_role(role_name: &str) -> Value {
    use workfall_rocket_rs::schema::roles;

    let id = uuid::Uuid::new_v4().to_string();

    let connection = &mut establish_connection();

    let new_role: NewRole = NewRole { id: &id, role_name };

    let created_role: Role = diesel::insert_into(roles::table)
        .values(&new_role)
        .get_result::<Role>(connection)
        .expect("Error saving new role");

    json!(created_role)
}

Threads can outlive their parents, so you need to either promise you won't do that with scoped threads or clone the data so you can move it in.

1 Like

@simonbuchan is right, I forgot about the 'static bound on the closure that is passed to thread::spawn. I updated my example to use thread::scope instead, which allows non-static captures in the spawned closures.

1 Like

It helped me thanks for that.

I wanted to know that if we are implementing multittheading in the above POST API, after hitting this post api I am able to add the single role_name to postgres. But I get following output.

Thread 0 started
Thread 1 started
Thread 2 started
Thread 6 started
Thread 5 started
Thread 3 started
Thread 7 started
Thread 4 started
Thread 8 started
Thread 9 started
Thread 8 finished
thread 'thread '<unnamed><unnamed>' panicked at '' panicked at 'Error saving new role: DatabaseError(UniqueViolation, "duplicate key value violates unique constraint \"roles_role_name_key\"")Error saving new role: DatabaseError(UniqueViolation, "duplicate key value violates unique constraint \"roles_role_name_key\"")', ', src/services/users.rssrc/services/users.rs::3232::1010

note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'thread '<unnamed><unnamed>' panicked at '' panicked at 'Error saving new role: DatabaseError(UniqueViolation, "duplicate key value violates unique constraint \"roles_role_name_key\"")Error saving new role: DatabaseError(UniqueViolation, "duplicate key value violates unique constraint \"roles_role_name_key\"")', ', src/services/users.rssrc/services/users.rs::3232::1010

thread '<unnamed>' panicked at 'Error saving new role: DatabaseError(UniqueViolation, "duplicate key value violates unique constraint \"roles_role_name_key\"")', src/services/users.rs:32:10
thread '<unnamed>' panicked at 'Error saving new role: DatabaseError(UniqueViolation, "duplicate key value violates unique constraint \"roles_role_name_key\"")', src/services/users.rs:32:10
thread '<unnamed>' panicked at 'Error saving new role: DatabaseError(UniqueViolation, "duplicate key value violates unique constraint \"roles_role_name_key\"")', src/services/users.rs:32:10
thread '<unnamed>' panicked at 'Error saving new role: DatabaseError(UniqueViolation, "duplicate key value violates unique constraint \"roles_role_name_key\"")', src/services/users.rs:32:10
thread '<unnamed>' panicked at 'Error saving new role: DatabaseError(UniqueViolation, "duplicate key value violates unique constraint \"roles_role_name_key\"")', src/services/users.rs:32:10

In model I am passing

#[derive(Deserialize)]
pub struct UserInputRole {
    pub role_name: String,
}

Postman Json
{
"role_name":"USE63"
}

I have shared the add role fn upside. I want to add such multiple users at a time using all the threads.

What should I do ?

Looks like there's a UNIQUE constraint on the roles_role_name_key field in your table. Maybe remove that requirement? It is also pretty wasteful to spawn threads to insert values simultaneously into a database and you should consider combining all insertions into a single transaction.

I didn't got you clearly, i am beginner and I want to implement multi threading in rust. can you please provide some example for help me out.

I meant that it would make much more sense from a perspective of performance and readability to create 10 versions of NewRole and pass them all in a single vector to .values(...), all in a single thread. But if this is a toy project where you want to experiment with multi-threading, just go ahead and use multiple threads.

I'm not really familiar with diesel, but concerning the error you currently get, I think you have some sql file where you create the roles table in your project. The roles table contains a column roles_role_name_key which has a UNIQUE constraint. That means all values in that column must be unique. Two rows cannot have the same value in that cell. If you remove that UNIQUE constraint I believe your error will go away. Or, if the column must be UNIQUE, you must change your logic to make sure you don't insert the same value into that column twice.

Yes, I am just experimenting it only. where i just want to implement for testing how does it works.

As you said that go ahead and use multiple threads, so what I am doing is what you want to say am I right ?

Yes sure, you are right. Your solution should work. I was just saying that you can make it more performant with a different design. For example if you want to write a web service that other people use and you must care about performance and response times.

1 Like
#[post("/users/add-role", format = "json", data = "<role_info>")]
pub fn create_role(role_info: Json<UserInputRole>) -> Value {
    // bind &role_info.role_name to a variable which is then moved
    // into each thread's closure 
    let role_name = &role_info.role_name;

    thread::scope(|s| {
        for i in 0..10 {
            s.spawn(move || {
                println!("Thread {} started", i);
                services::users::add_role(role_name);
                println!("Thread {} finished", i);
            });
        }
    });

    return Null;
}

What change should I do, so that number of threads will be equal to number of inputs as users in my postman. And then 1 thread will add 1 user to database.

I'm not quite sure I follow. Do you want to send a list of UserInputRole instances to your endpoint and then for each UserInputRole spawn a new thread that saves it to the database?

Yes correct same I want to do.

I don't want to steal a valuable learning experience from you, so here's a hint:

change your function signature from that to

pub fn create_roles(role_infos: Json<Vec<UserInputRole>>) -> Value

and see if you are able to update your function body accordingly. Another hint: Storing Lists of Values with Vectors - The Rust Programming Language gives you a helpful example for what you have to change in your function body. Hope that gives you the tools to solve this on your own :slightly_smiling_face:

I tried but I didn't got my o/p as expected, please help me out.

This should work, I think:

#[post("/users/add-role", format = "json", data = "<role_info>")]
pub fn create_role(role_infos: Json<Vec<UserInputRole>>) -> Value {
    thread::scope(|s| {
        for (i, role_info) in role_infos.into_iter().enumerate() {
            s.spawn(move || {
                println!("Thread {} started", i);
                services::users::add_role(&role_info.role_name);
                println!("Thread {} finished", i);
            });
        }
    });

    return Null;
}
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.