Hello,
Let's say I have a run_application()
function that executes an external utility and collects its output. So basically, it does std::process::Command::output()
. The code works great when run_application()
is called normally, from the main thread. But to take advantage of the multiple cores of my computer, and to speed things up, I would like to have several instances of run_application()
running in parallel, processing each a different file. For this purpose, I'm creating 3 threads, which each call run_application()
on their own. I suppose this is the standard approach (correct me if I'm wrong).
But for some reason, std::process::Command::output()
inside run_application()
blocks the execution until the current thread is finished. In other words, only 1 thread is working at a time, even though I'm creating 3 separate threads. I can't figure out how to solve this, and I have no prior experience with threads, which doesn't help either... I need your help!
You will find below 2 versions of my code: the DUMMY VERSION and the ACTUAL VERSION. The DUMMY VERSION has an empty run_application()
, and it works as expected: I'm seeing:
"thread1 started
thread2 started
thread3 started"
displayed at the same time. However, the ACTUAL VERSION first displays "thread1 started", and after a delay of 5 seconds (because for testing purposing, I'm executing the external utility "sleep" with the parameter "5"), it displays "thread2 started", etc.
DUMMY VERSION
use kdam::{term, tqdm, BarExt};
use std::{thread, time::Duration};
fn run_application(progression: &mut kdam::Bar, application: &str, parametres: &[&str]) -> Result<(), ()> {
progression.write("application started").unwrap();
Ok(())
}
fn main() -> Result<(), ()> {
term::init(false);
let pb1 = tqdm!(total = 150, position = 0);
let pb_arc = std::sync::Arc::new(std::sync::Mutex::new(pb1));
let pb1_arc = pb_arc.clone();
let pb2_arc = pb_arc.clone();
let pb3_arc = pb_arc.clone();
let thread1 = thread::spawn(move || {
if run_application(&mut pb1_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
pb1_arc.lock().unwrap().write("thread1 started").unwrap();
for _ in 0..50 {
thread::sleep(Duration::from_secs_f32(0.1));
pb1_arc.lock().unwrap().update(1).unwrap();
}
}
});
let thread2 = thread::spawn(move || {
if run_application(&mut pb2_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
pb2_arc.lock().unwrap().write("thread2 started").unwrap();
for _ in 0..50 {
thread::sleep(Duration::from_secs_f32(0.1));
pb2_arc.lock().unwrap().update(1).unwrap();
}
}
});
let thread3 = thread::spawn(move || {
if run_application(&mut pb3_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
pb3_arc.lock().unwrap().write("thread3 started").unwrap();
for _ in 0..50 {
thread::sleep(Duration::from_secs_f32(0.1));
pb3_arc.lock().unwrap().update(1).unwrap();
}
}
});
// join other worker threads
for thread in [thread1, thread2, thread3] {
thread.join().unwrap();
}
eprint!("{}", "\n".repeat(3));
println!("completed!");
Ok(())
}
ACTUAL VERSION
use kdam::{term, tqdm, BarExt};
use std::{thread, time::Duration};
fn run_application(progression: &mut kdam::Bar, application: &str, parametres: &[&str]) -> Result<(), ()> {
let mut command = std::process::Command::new(application);
if let Ok(résultat) = command.args(parametres).output() {
let final_text = unsafe {
std::str::from_utf8_unchecked(&résultat.stderr)
};
// make use of final_text
progression.write("application started").unwrap();
Ok(())
} else {
progression.write("application failed to start").unwrap();
Err(())
}
}
fn main() -> Result<(), ()> {
term::init(false);
let pb1 = tqdm!(total = 150, position = 0);
let pb_arc = std::sync::Arc::new(std::sync::Mutex::new(pb1));
let pb1_arc = pb_arc.clone();
let pb2_arc = pb_arc.clone();
let pb3_arc = pb_arc.clone();
let thread1 = thread::spawn(move || {
if run_application(&mut pb1_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
pb1_arc.lock().unwrap().write("thread1 started").unwrap();
for _ in 0..50 {
thread::sleep(Duration::from_secs_f32(0.1));
pb1_arc.lock().unwrap().update(1).unwrap();
}
}
});
let thread2 = thread::spawn(move || {
if run_application(&mut pb2_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
pb2_arc.lock().unwrap().write("thread2 started").unwrap();
for _ in 0..50 {
thread::sleep(Duration::from_secs_f32(0.1));
pb2_arc.lock().unwrap().update(1).unwrap();
}
}
});
let thread3 = thread::spawn(move || {
if run_application(&mut pb3_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
pb3_arc.lock().unwrap().write("thread3 started").unwrap();
for _ in 0..50 {
thread::sleep(Duration::from_secs_f32(0.1));
pb3_arc.lock().unwrap().update(1).unwrap();
}
}
});
// join other worker threads
for thread in [thread1, thread2, thread3] {
thread.join().unwrap();
}
eprint!("{}", "\n".repeat(3));
println!("completed!");
Ok(())
}