[E0621] or [E0495] error when sharing data across threads

Hello!

Like many others, I am butting heads with the borrow checker, and I can't seem to figure out what exactly I should be doing in my case.

I have checked out a few related issues, but what I am doing isn't exactly the same.
I am attempting to process some data in parallel, while using Arc to share the (read only) data across those processes.

I am aware Rust sees sending the Arc and it's contents into multiple processes as dangerous, since it "could" outlive the program.
However, it's clear I am immediately joining everything before the end of the function.

How do I convince Rust to let me do this? What changes do I need to make for this to work?

I tried following the compilers first suggestion, adding a 'static for &Data, but on trying it, and researching it, it's obvious I don't want the data to live for the full program length.

The [E0495] came when I tried to use 'a for &Data, which I could not get to work, in several combinations.

There must be a way to do this?

Here is my sample code, it is similar to what I am actually doing, but with all of the rest of it stripped away:

use std::thread::spawn;
use std::sync::Arc;

extern crate num_cpus;
use num_cpus::get;

struct Reader {
    a_count : usize,
    a_vec : Vec<i64>
}

struct ReaderContainer {
    rdrs : Vec<Box<Reader>>
}

struct Data {
    repeat : usize,
    something : Vec<i64>
}

fn main() {
    let length = a_secondary_function();
    println!(
        "length of Reader vec in container: {}", 
        length
    );
}

fn a_secondary_function() -> usize {
    let data_chunk : Data = get_data();
    a_function_in_the_middle(&data_chunk)
}

fn a_function_in_the_middle(data_chunk : &Data) -> usize {
    let mut container : ReaderContainer = make_container();
    do_a_bunch_in_parallel(&data_chunk, &mut container);
    container.rdrs.len()
}

fn do_a_bunch_in_parallel(data_chunk : &Data, container : &mut ReaderContainer) {

    let shared_data = Arc::new(data_chunk);
    let mut t_handles = vec![];

    for _ in 0 .. get() {
        let arc_data = shared_data.clone();
        let boxed_reader_opt = container.rdrs.pop();
        match boxed_reader_opt {
            Some(boxed_reader) => {
                t_handles.push(
                    spawn(
                        move || { 
                            process(boxed_reader, arc_data) 
                        }
                    )
                );
            }
            None => {
                panic!("//(0 o 0)//");
            }
        }
        
    }
    for handler in t_handles {
        container.rdrs.push(handler.join().unwrap());
    }
}

fn process(data_reader :  Box<Reader>, shared_data : Arc<&Data>) -> Box<Reader>  {
    let mut read : Reader = *data_reader;
    let inner_data : &Data = &shared_data;
    let borrowed_vec : &Vec<i64> = &shared_data.something;
    for i in 0 .. read.a_vec.len() {
      read.a_vec[i] = borrowed_vec[i];
    }
    let mut a : usize = 0;
    // simulates doing a bunch of stuff.
    for k in 0 .. shared_data.repeat {
        a += k;
    }
    read.a_count = a;
    Box::new(read)
}



fn get_data() -> Data {
    Data {
        repeat : 100000000,
        something : vec![1; 10]
    }
}

fn make_Reader() -> Reader {
    Reader {
        a_count : 0,
        a_vec : vec![10; 10]
    }
}

fn make_container() -> ReaderContainer {
    let mut readers : Vec<Box<Reader>> = Vec::new();
    for i in 0 .. get() {
        readers.push(
            Box::new(
                make_Reader()
            )
        );
    }
    ReaderContainer {
        rdrs : readers
    }
}

(Playground)

Errors:

   Compiling playground v0.0.1 (/playground)
error[E0621]: explicit lifetime required in the type of `data_chunk`
  --> src/main.rs:51:21
   |
40 | fn do_a_bunch_in_parallel(data_chunk : &Data, container : &mut ReaderContainer) {
   |                                        ----- help: add explicit lifetime `'static` to the type of `data_chunk`: `&'static Data`
...
51 |                     spawn(
   |                     ^^^^^ lifetime `'static` required

error: aborting due to previous error

For more information about this error, try `rustc --explain E0621`.
error: could not compile `playground`

To learn more, run the command again with --verbose.

Nevermind. I did more research on the use of the std spawn function, and realized I was asking for the impossible.

So I looked at cross beam and found the following solution, which works just fine:

use std::thread::spawn;
use std::sync::Arc;

extern crate num_cpus;
use num_cpus::get;

extern crate crossbeam;
use crossbeam::scope;

struct Reader {
    a_count : usize,
    a_vec : Vec<i64>
}

struct ReaderContainer {
    rdrs : Vec<Box<Reader>>
}

struct Data {
    repeat : usize,
    something : Vec<i64>
}

fn main() {
    let length = a_secondary_function();
    println!(
        "length of Reader vec in container: {}", 
        length
    );
}

fn a_secondary_function() -> usize {
    let data_chunk : Data = get_data();
    a_function_in_the_middle(&data_chunk)
}

fn a_function_in_the_middle(data_chunk : &Data) -> usize {
    let mut container : ReaderContainer = make_container();
    do_a_bunch_in_parallel(&data_chunk, &mut container);
    for i in 0 .. container.rdrs.len() {
        println!("modified value: {} for {}", container.rdrs[i].a_count, i);
    }
    container.rdrs.len()
}

fn do_a_bunch_in_parallel(data_chunk : &Data, container : &mut ReaderContainer) {

    let shared_data = Arc::new(data_chunk);
    let mut t_handles = vec![];

    scope(
        |scope| {
            for _ in 0 .. get() {
                let arc_data = shared_data.clone();
                let boxed_reader_opt = container.rdrs.pop();
                match boxed_reader_opt {
                    Some(boxed_reader) => {
                        t_handles.push(scope.spawn(
                            move || { 
                                process(boxed_reader, arc_data) 
                            }
                        )
                      );
                    }
                    None => {
                        panic!("//(0 o 0)//");
                    }
                }
            }
        }
    );
    
    for handler in t_handles {
        container.rdrs.push(handler.join());
    }
}

fn process(data_reader :  Box<Reader>, shared_data : Arc<&Data>) -> Box<Reader>  {
    let mut read : Reader = *data_reader;
    let inner_data : &Data = &shared_data;
    let borrowed_vec : &Vec<i64> = &shared_data.something;
    for i in 0 .. read.a_vec.len() {
      read.a_vec[i] = borrowed_vec[i];
    }
    let mut a : usize = 0;
    // simulates doing a bunch of stuff.
    for k in 0 .. shared_data.repeat {
        a += k;
    }
    read.a_count = a;
    Box::new(read)
}



fn get_data() -> Data {
    Data {
        repeat : 100000000,
        something : vec![1; 10]
    }
}

fn make_Reader() -> Reader {
    Reader {
        a_count : 0,
        a_vec : vec![10; 10]
    }
}

fn make_container() -> ReaderContainer {
    let mut readers : Vec<Box<Reader>> = Vec::new();
    for i in 0 .. get() {
        readers.push(
            Box::new(
                make_Reader()
            )
        );
    }
    ReaderContainer {
        rdrs : readers
    }
}

Just a first impression from skimming your code, but Arc<&Data> is almost certainly not what you want. Arc provides shared ownership, so you should work with Arc<Data> instead, .clone()ing whenever you need a new "reference" (such as for passing to another thread). The whole point of Arc (see the documentation) is that calling .clone() doesn't do a deep copy of the data, it just increments a reference count and gives you a new smart pointer to the original data.

Also: the thing that you create with spawn is called a "thread", not a "process"; "process" means something different. I've edited the title of your post to reduce confusion.

I'll do some research / reading on threads then. I was under the impression that when you spawn a system thread, on a separate core, you were supposed to call that a process.

As for the use of &Data, do you mean that

do_a_bunch_in_parallel(&data_chunk, &mut container);

should be

do_a_bunch_in_parallel(data_chunk, &mut container);

? If so, I think I agree, that looks like a mistake on my part.
If not, I should mention that this code is a contrived example of what I'm actually doing,
do_a_bunch_in_parallel is actually in a loop, and I don't want to move data_chunk, I really do just want to borrow it.
data_chunk also gets sent into at least one other function.
data_chunk is always read only, I'm not mutating it.

Do you mean change &Data to Data for all function calls?

If so I'm not sure if I agree with that, given that I am re-using the data_chunk multiple times.

Thanks

I'd suggest changing the type of the data_chunk argument of do_a_bunch_in_parallel to Arc<Data>; then it can clone data_chunk whenever it needs a new "borrow" (again, this is cheap). You can create the original Arc<Data> in a_secondary_function:

fn a_secondary_function() -> usize {
    let data_chunk: Data = get_data();
    a_function_in_the_middle(Arc::new(data_chunk))
}

If you have data_chunk: Arc<Data> you can call methods of Data on data_chunk directly, or get a &Data with data_chunk.as_ref().

A process has its own address space, while distinct threads spawned in the same process share an address space. This is why threads can communicate using shared-memory mechanisms like Arc.

(Also a spawned thread does not get its own separate core; while the process with your code is running, the OS schedules the threads onto available cores as it sees fit. This is why you can spawn 200 threads even if your machine has only 4 cores.)

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.