How to stop executing loop waiting for parallel thread?

Hi,

I've got for example 50 large txt files which I have to read line by line . All 50 files have the SAME number of rows .

I want to :

-- read (in parallel way ) first line of each file and do some parallel operation (operation on the first file record is a one thread)
-- when all operations are done in parallel way, I have to read (parallel) second line from each files and do some operation in parallel
-- and so on ...

How to do it ?

For example when I have only one file :

    let file = File::open(filepath)?;
    let reader = BufReader::new(file);
    
    let mut range = reader.lines();  

    loop {
        match range.next() {
           Some(x) => {
             // do some operation 
         },
         None => { break }
     }
    }

But I need to hang executing loop until all parallels operations (from other files) will be done .

Thanks in advance for any hints .

If you create a thread, in most libraries (including Rust's std), you get back a handle, which you can usually join(). You can collect all your thread handles in a Vec and then iterate over it to join them all.

You can use an std::sync::Barrier to block every thread that reaches the barrier until all of them have waited at the barrier. That may not be the most efficient way to handle this though

1 Like

You could step through them with rayon:

while ranges.par_iter_mut().all(|range| {
       match range.next() {
          Some(x) => {
            // do some operation 
            true
        },
        None => { false }
    }
}) {} // empty `while` body

Hi , thanks for all hints. Exploring threads in rust , I've done something like this :

fn files() -> Vec<&'static str>{
   return vec!["FILE1.txt", "FILE2.txt", "FILE3.txt"]
} 

fn main() -> Result<() , Box<dyn std::error::Error>> {

  let now = Instant::now();
  let mut record_iterators = HashMap::new();

  for my_file in files() {
    let file = File::open(my_file)?;
    let reader = BufReader::new(file);
    let range = reader.lines();  
    record_iterators.insert(my_file.to_string(), Mutex::new(range));
    
  }
  let database = Arc::new(Mutex::new(record_iterators)); 
  for _ in 0..2000000{ //number of records on each file
      let record_handles =
           files() 
          .into_iter()
          .map(|i| {
                     let cloned_ref = database.clone();
                     std::thread::spawn(move || {
                            let x  =  cloned_ref.lock().unwrap()
                                         .get(i).unwrap().lock().unwrap()
                                         .next().unwrap();
                            // do some operations on variable x ;
                      })
          }
          ).collect::<Vec<_>>();

        record_handles.into_iter().for_each(|h| h.join().unwrap());
}
  let elapsed = now.elapsed();
  println!("Elapsed: {:.2?}", elapsed);
  Ok(())
}

Now it works well .
Regards

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.