How to share a mutable object reference between two closures

Hello community,

Is it possible to share a mutable object reference between two closures (each one being a different thread)?

I have an application that requires multiple producers (that will be indefinitely producing data) and a consumer that runs in parallel and I cannot share the data buffer between the threads. Any help is appreciated.

use tokio::task;
use std::{thread, time};
use std::sync::{Arc, Mutex};


struct Data {
    value: i32
}


fn produce(data: &mut Arc<Mutex<Data>>) {
    for i in 0..300 {
        {
            let mut data = data.lock().expect("Could not lock mutex");
            data.value += 1;
            println!("Produced: {}", data.value);
        }
        thread::sleep(time::Duration::from_millis(500));
    }
}

fn consume(data: &mut Arc<Mutex<Data>>) {
    for i in 0..300 {
        {
            let mut data = data.lock().expect("Could not lock mutex");
            if data.value > 0 {
                data.value -= 1;
                println!("Consumed: {}", data.value);
            }
        }
        thread::sleep(time::Duration::from_millis(1000));
    }
}

#[tokio::main]
async fn main() {
    let mut data = Arc::new(Mutex::new(Data{value: 0}));
    let producing = task::spawn_blocking(move ||produce(&mut data));
    let consuming = task::spawn_blocking(move ||consume(&mut data));
}

This code does not compile with the following error:

error[E0382]: use of moved value: `data`
  --> future\./future.rs:39:42
   |
37 |     let mut data = Arc::new(Mutex::new(Data{value: 0}));
   |         -------- move occurs because `data` has type `Arc<std::sync::Mutex<Data>>`, which does not implement the `Copy` trait
38 |     let producing = task::spawn_blocking(move ||produce(&mut data));
   |                                          -------             ---- variable moved due to use in closure
   |                                          |
   |                                          value moved into closure here
39 |     let consuming = task::spawn_blocking(move ||consume(&mut data));
   |                                          ^^^^^^^             ---- use occurs due to use in closure
   |                                          |
   |                                          value used here after move

No it is not. They're called mutable borrows but what they really are is exclusive borrows i.e. by their very definition they provide exclusive access.

But for sharing data between threads there is a better mechanism: a multi-producer, multi-consumer queue. Have a look at crossbeam-channel. It is battle tested and is actually used by stdlib to implement the mpsc module there.

4 Likes

I can see two problems here:

First, the point of using Arc is so you can use reference counting to have multiple owners to an immutable reference, so first you need to clone, in your example that would be: let clone = Arc::clone(&data), then you pass the clone to the other thread.

But even that won't work because you cannot have mutable references to Arc, so &mut data will cause another error.

You can check The Book chapter on Shared-State concurrency, If I understood you correctly I think that's what you're trying to do: Shared-State Concurrency - The Rust Programming Language

Edit: Although it might be better to use channels instead: Using Message Passing to Transfer Data Between Threads - The Rust Programming Language

Thank you both for the answers. I read the links you provided and after thinking about it, I found a way to handle my case:

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Instant, Duration};

const MAX_DATA_PRODUCED: usize = 100;

#[derive(Debug, Default)]
struct Buffer {
    values: Vec<String>,
    total_data_produced: usize
}


fn check_total_data_produced(buffer: &Arc<Mutex<Buffer>>) -> bool {
    buffer.lock().unwrap().total_data_produced < MAX_DATA_PRODUCED
}

fn check_data_to_consume(buffer: &Arc<Mutex<Buffer>>) -> bool {
    buffer.lock().unwrap().values.len() > 0
}


fn main() {
    let buffer = Arc::new(Mutex::new(Buffer::default()));
    let mut handles = vec![];

    // Consumer
    let consumer = Arc::clone(&buffer);
    let handle = thread::spawn(move || {
        while check_total_data_produced(&consumer) {
            if check_data_to_consume(&consumer) {
                {
                    consumer.lock().unwrap().values.pop();
                    println!("Consumed...");
                }                
            }      
        }               
    });
    handles.push(handle); 

    // Producers
    for i in 0..10 {
        let producer = Arc::clone(&buffer);
        let handle = thread::spawn(move || {
            for _ in 0..10 {
                thread::sleep(Duration::from_secs(1));
                {
                    let mut num = producer.lock().unwrap();
                    num.total_data_produced += 1;
                    let message = format!("Produced by {}", i);
                    println!("{}", message.clone());
                    num.values.push(message);
                }
            }
        });
        handles.push(handle);        
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {:?}", *buffer.lock().unwrap());
}
1 Like