as my first project I wanted to write a command line script that takes a CSV and calculate some zscores without loading the whle file into memory. Therefore the script reads the csv multiple times.
Even though it was single-threaded it was not particularly slow. However I wanted to speed it up using all avilable threads.
For the first iteration through the csv I got the multi-threading up and running. You can find the code for refernce here.
However I failed to get this running. I tried to do it like in similar posts, but I failed to get it running. The current failing code can be found here.
When I try to parallelize the second iteration through the csv I need to read two HashMaps from the first iteration (read_only just for lookups). The errors I get are all about movement.
error[E0716]: temporary value dropped while borrowed
--> src\main.rs:196:47
|
196 | let counts_shared = Arc::new(RwLock::new(&counts.clone()));
| ^^^^^^^^^^^^^^ - temporary value is freed at the end of this statement
| |
| creates a temporary value which is freed while still in use
...
213 | threads.push(std::thread::spawn(move|| {
| ______________________-
214 | | get_total_deltas_subset(&f_name, &groupby_col,
215 | | &count_col, &thread_idx,
216 | | available_cores.clone(), &counts_shared_clone.read().unwrap(),
217 | | &col_indices_shared_clone.read().unwrap())
218 | | }));
| |__________- argument requires that borrow lasts for `'static`
error[E0716]: temporary value dropped while borrowed
--> src\main.rs:199:52
|
199 | let col_indices_shared = Arc::new(RwLock::new(&col_indices.clone()));
| ^^^^^^^^^^^^^^^^^^^ - temporary value is freed at the end of this statement
| |
| creates a temporary value which is freed while still in use
...
213 | threads.push(std::thread::spawn(move|| {
| ______________________-
214 | | get_total_deltas_subset(&f_name, &groupby_col,
215 | | &count_col, &thread_idx,
216 | | available_cores.clone(), &counts_shared_clone.read().unwrap(),
217 | | &col_indices_shared_clone.read().unwrap())
218 | | }));
| |__________- argument requires that borrow lasts for `'static`
error[E0382]: use of moved value: `counts_shared_clone`
--> src\main.rs:213:41
|
197 | let counts_shared_clone = counts_shared.clone();
| ------------------- move occurs because `counts_shared_clone` has type `Arc<RwLock<&HashMap<String, (i32, rust_decimal::Decimal)>>>`, which does not implement the `Copy` trait
...
213 | threads.push(std::thread::spawn(move|| {
| ^^^^^^ value moved into closure here, in previous iteration of loop
...
216 | available_cores.clone(), &counts_shared_clone.read().unwrap(),
| ------------------- use occurs due to use in closure
error[E0382]: use of moved value: `col_indices_shared_clone`
--> src\main.rs:213:41
|
200 | let col_indices_shared_clone = col_indices_shared.clone();
| ------------------------ move occurs because `col_indices_shared_clone` has type `Arc<RwLock<&HashMap<String, usize>>>`, which does not implement the `Copy` trait
...
213 | threads.push(std::thread::spawn(move|| {
| ^^^^^^ value moved into closure here, in previous iteration of loop
...
217 | &col_indices_shared_clone.read().unwrap())
| ------------------------ use occurs due to use in closure
I understood that I cannot just take my previous HashMaps as they are located on the heap and thus cannot be just copied, but have to move. However I thought that I can still clone them into each thread and thus grant read_only access without movement.
I wonder what I miss here that it still can't compile. (Apologies for the horrible code quality... still figuring out how things work)
The main problem here is the ampersand. Your code clones counts, and then puts a reference to the clone inside the RwLock. Remove the ampersand so you are putting the clone into the RwLock, rather than a reference to the clone.
Note that &counts.clone() is parsed as &(counts.clone()) rather than as (&counts).clone().
Additionally, if you are only going to read from it, then you shouldn't use an RwLock. Just use an Arc.
let counts_shared = Arc::new(counts.clone());
Furthermore, you need to clone for each thread, like this:
for thread_idx in range {
let counts_shared_clone = counts_shared.clone();
threads.push(std::thread::spawn(move|| { ... }));
}
Your current code tries to reuse the same clone in several threads, which is not possible.
Finally, you never modify counts after putting in an Arc, so you can avoid the clone:
let counts_shared = Arc::new(counts);
Values in an Arc are immutable, but that doesn't matter to you.
Since you're waiting for all of the threads to finish anyway, you may want to look at std::thread::scope which allows you to spawn threads that borrow from the thread that creates them.
// Iterate 2nd time through rows to get standard deviation of reference categories of zscores
let range: Vec<u32> = (0..available_cores).collect();
let mut results = Vec::new();
std::thread::scope(|s| {
let mut threads = Vec::new();
for thread_idx in range {
let f_name = filename.clone();
let groupby_col = env::args().nth(1).expect("groupby_col not provided");
let count_col = env::args().nth(2).expect("count_col not provided");
threads.push(s.spawn({
let counts = &counts;
let col_indices = &col_indices;
move || {
get_total_deltas_subset(
&f_name,
&groupby_col,
&count_col,
&thread_idx,
available_cores,
counts,
col_indices,
)
}
}));
}
for thread in threads {
results.extend(thread.join());
}
});
Thanks for your fast reply. I tried to adjust the code like this:
let counts_shared = Arc::new(counts.clone());
let col_indices_shared = Arc::new(col_indices.clone());
// Iterate 2nd time through rows to get standard deviation of reference categories of zscores
let range: Vec<u32> = (0..available_cores).collect();
let mut results = Vec::new();
let mut threads = Vec::new();
for thread_idx in range {
let f_name = filename.clone();
let groupby_col = env::args().nth(1).expect("groupby_col not provided");
let count_col = env::args().nth(2).expect("count_col not provided");
threads.push(std::thread::spawn(move|| {
let counts_shared_clone = counts_shared.clone();
let col_indices_shared_clone = col_indices_shared.clone();
get_total_deltas_subset(&f_name, &groupby_col,
&count_col, &thread_idx,
available_cores.clone(), &counts_shared_clone,
&col_indices_shared_clone)
}));
}
for thread in threads {
results.extend(thread.join());
};
Now only errors related to movement are left:
error[E0382]: use of moved value: counts_shared
--> src\main.rs:210:41
|
196 | let counts_shared = Arc::new(counts.clone());
| ------------- move occurs because counts_shared has type Arc<HashMap<String, (i32, rust_decimal::Decimal)>>, which does not implement the Copy trait
...
210 | threads.push(std::spawn(move|| {
| ^^^^^^ value moved into closure here, in previous iteration of loop
211 | let counts_shared_clone = counts_shared.clone();
| ------------- use occurs due to use in closure
error[E0382]: use of moved value: col_indices_shared
--> src\main.rs:210:41
|
197 | let col_indices_shared = Arc::new(col_indices.clone());
| ------------------ move occurs because col_indices_shared has type Arc<HashMap<String, usize>>, which does not implement the Copy trait
...
210 | threads.push(std::spawn(move|| {
| ^^^^^^ value moved into closure here, in previous iteration of loop
211 | let counts_shared_clone = counts_shared.clone();
212 | let col_indices_shared_clone = col_indices_shared.clone();
| ------------------ use occurs due to use in closure
For more information about this error, try rustc --explain E0382.
Why is cloning not possible here and how can I solve this? I cannot see why move is forced.
I updated the code here. (This includes some other minor changes)
The problem here is that a move closure moves the value when it is created - long before it ever gets called. That means the next loop iteration can't access counts_shared. The fix is to clone it before you capture it in the closure. Usually this is done in a block that returns the closure.
let counts_shared = Arc::new(counts);
let col_indices_shared = Arc::new(col_indices);
// Iterate 2nd time through rows to get standard deviation of reference categories of zscores
let range: Vec<u32> = (0..available_cores).collect();
let mut results = Vec::new();
let mut threads = Vec::new();
for thread_idx in range {
let f_name = filename.clone();
let groupby_col = env::args().nth(1).expect("groupby_col not provided");
let count_col = env::args().nth(2).expect("count_col not provided");
threads.push(std::thread::spawn({
// Create clones here, just before they are moved into the closure
let counts_shared_clone = counts_shared.clone();
let col_indices_shared_clone = col_indices_shared.clone();
// Closure takes our new clones, and the old value is available for the next loop iteration.
// The closure is returned from the block and passed to std::thread::spawn just like before
move || {
get_total_deltas_subset(
&f_name,
&groupby_col,
&count_col,
thread_idx,
available_cores,
&counts_shared_clone,
&col_indices_shared_clone,
)
}
}));
}
It's not the Arc that changes things, it's the difference between scope and std::thread::spawn. When you call the normal spawn the thread that called spawn could exit immediately afterwards. That would be bad if the new thread had a reference to data that was on the old thread's stack. scope solves this problem by forcing all of the threads spawned with the guard (s in my example) to exit before the call to scope returns. This ensures that any borrowed data that the new threads use still exists until they exit.
Since you were already doing that, it was the perfect fit for your problem! Lots of times it isn't practical to wait for the spawned threads like that though, so knowing how to work with Arc clones like that is still valuable.