Spawning multiple threads with rayon and wait until all threads finish

in the following example I tried to create a group of threads that would be limited to 4 and these threads should go through the whole for loop and speed up what is in for loops but it happens to me that the more threads I use the more the file writer skips the files and I I don't know how to solve it.

use std::sync::RwLock;
use static_init::dynamic;
use std::time::Instant;
#[dynamic(lazy)]
static TESTCOUNT: RwLock<Counter> =RwLock::new(Counter::init(0));

fn main() {
        let pool = rayon::ThreadPoolBuilder::new().num_threads(4).build().unwrap();
let n_jobs = 200;
let now = Instant::now();
    pool.scope(|scope| {
for _ in 0..n_jobs {
        scope.spawn( |_| resize_vec());
   }
    });
let actualtime=now.elapsed();
println!("{}",actualtime.as_millis());
println!("{}",TESTCOUNT.read().unwrap().count);
}

fn resize_vec() {
TESTCOUNT.write().unwrap().count+= 1;
let mut _line = String::new();
for _ in 0..10000000 {
_line.push('a');
}
std::fs::write("files/".to_owned()+&TESTCOUNT.read().unwrap().count.to_string(),_line.clone()).unwrap();
}

struct Counter {
count: i32
}

impl Counter {
    pub fn init(count: i32) -> Self {
        Self {
            count,
        }
}
}

Get rid of the mutex. Use an Atomic counter. Your read lock blocks your write locks and visa versa. Your just context switching uselessly here. Not sure if this is THE problem, but should be fixed either way.

Another minor nitpick, presize the string. It's allocating and freeing and copying dozens of times (if not hundreds). Maybe rust compiler is smart enough to detect this use case.

I'm not 100% sure, but mutex locks tend to have a scope that lasts till the end of the block. What's confusing to me is that if this were true, your reader would block forever.
I haven't used rayon thread pools (just their par_iter) but in cross beam the threads get messages and block untill all scopes complete. Does this program exit? If so I'd be surprised to see missing (jobs). Could the unwrap be panicking and not logging?

i am added mutex only because if i increased count without mutex with
2 threads it skipped a lot of counting and more threads i use less
files is written at the end

2022-05-22 0:36 GMT+02:00, Michael Maraist via The Rust Programming
Language Forum notifications@rust-lang.discoursemail.com:

If I set the threadpool to use 1 thread, my test will write as many
files as I created jobs, but if I put num threads on 2, for example,
only half of the files will be written.

Looking at this again, I definitely think you should change the counter. Either you are blocking it for the whole function call (effectively removing any parallelism) or you are reading a collapsed value (eg several adds happen all at once, and then they all read the same value at the end)

Not at computer, but search from std:atomic:AtomicInteger or some such thing and make sure to add and retain the modified value at beginning of function.

1 Like

I tried to delete the mutex and then it happened that my test wrote
file 0 and all the others skipped when I had the threads set to 2

2022-05-22 1:32 GMT+02:00, Michael Maraist via The Rust Programming
Language Forum notifications@rust-lang.discoursemail.com:

You're updating the counter with a write lock, but then much later you're using a separate read lock to read the counter. It may have been modified by other threads as well in the meantime. Like:

  • Thread 1 writes count += 1 (count == 1)
  • Thread 2 writes count += 1 (count == 2)
  • Thread 1 finishes its work, reads count 2, and writes that file name.
  • Thread 2 finishes its work, reads count 2, and writes the same file name.

There are ways to fix that kind of race, but you could instead use a parallel iterator for the count:

(0..n_jobs).into_par_iter().for_each(|i| {
    // Calculate and write file `i`
});
2 Likes

I created this example because in my original project I will have an
aes_gcm_siv encryptor which will read the files then the encrypt will
be used and written to the same file and therefore for example I used
thread workers via python but I don't know how to do it so that 4
threads helped speed up this process

2022-05-22 8:36 GMT+02:00, Josh Stone via The Rust Programming
Language Forum notifications@rust-lang.discoursemail.com:

on this link I have my original code where I just changed the keys so
that they are not the same as I have and there I would need to use
thread workers to speed up encrypting
(Playground)

Ok, well that code doesn't have anything like the counter problem -- it looks like that should work.

yes i was trying now to fix all things and take info from you but you
think that code now is good? because if i switching between numthreads
1 or 4 i don't see any performance differences

2022-05-22 16:52 GMT+02:00, Josh Stone via The Rust Programming
Language Forum notifications@rust-lang.discoursemail.com:

When you run with one cpu, does that max out 100%? (as viewed in your OS task manager)

It sounds like you're I/O bound, that one cpu is fast enough to keep up with the file delays for reading and writing.

no i have 8 threads and i am near 100 % if i using 7 or 8 threads but
maybe my ssd is not fast enough to do better operations

2022-05-22 18:04 GMT+02:00, Josh Stone via The Rust Programming
Language Forum notifications@rust-lang.discoursemail.com:

I tried multithreading on the fact that I read from the files and it
turned out that it works fast, so thank you for your advice.

at the end of this solution i wanted to ask if you know how i can
collect strings from this example
(Playground)

Rayon's spawn doesn't return a handle, so your thread_handles is collecting the empty unit type, ().

For that particular example, I would have written:

use rayon::prelude::*;

//...
    strings.par_iter_mut().for_each(child_job);

If you have a more general case where you're not just modifying in-place, parallel iterators can also map() and collect() just like regular sequential iterators. You can also manage this with your scope and spawn design by creating a Mutex<Vec<String>> outside the scope, then lock and push each value into that. As long as your parallel jobs have enough computation to do independently, then a quick lock at the end shouldn't be too bad for parallelism.

on link at the end of the message is the final version of what I would
like to do and therefore I would like to ask if it would not bother
you or if you would help me adjust it so that it works
(Playground)

2022-05-23 19:53 GMT+02:00, Josh Stone via The Rust Programming
Language Forum notifications@rust-lang.discoursemail.com:

Something like:

let test: Vec<String> = list.par_iter()
    .map(|entry| create_hash(entry))
    .collect();

and if i would like to have that in scope as i did you think you can
show me that too?

2022-05-23 23:53 GMT+02:00, Josh Stone via The Rust Programming
Language Forum notifications@rust-lang.discoursemail.com: