Thread woes, or How to call `std::process::Command::output()` in parallel?

Hello,

Let's say I have a run_application() function that executes an external utility and collects its output. So basically, it does std::process::Command::output(). The code works great when run_application() is called normally, from the main thread. But to take advantage of the multiple cores of my computer, and to speed things up, I would like to have several instances of run_application() running in parallel, processing each a different file. For this purpose, I'm creating 3 threads, which each call run_application() on their own. I suppose this is the standard approach (correct me if I'm wrong).

But for some reason, std::process::Command::output() inside run_application() blocks the execution until the current thread is finished. In other words, only 1 thread is working at a time, even though I'm creating 3 separate threads. I can't figure out how to solve this, and I have no prior experience with threads, which doesn't help either... I need your help!

You will find below 2 versions of my code: the DUMMY VERSION and the ACTUAL VERSION. The DUMMY VERSION has an empty run_application(), and it works as expected: I'm seeing:

"thread1 started
thread2 started
thread3 started"

displayed at the same time. However, the ACTUAL VERSION first displays "thread1 started", and after a delay of 5 seconds (because for testing purposing, I'm executing the external utility "sleep" with the parameter "5"), it displays "thread2 started", etc.

DUMMY VERSION

use kdam::{term, tqdm, BarExt};
use std::{thread, time::Duration};

fn run_application(progression: &mut kdam::Bar, application: &str, parametres: &[&str]) -> Result<(), ()> {
    progression.write("application started").unwrap();
    Ok(())
}

fn main() -> Result<(), ()> {
    term::init(false);

    let pb1 = tqdm!(total = 150, position = 0);
    let pb_arc = std::sync::Arc::new(std::sync::Mutex::new(pb1));
    let pb1_arc = pb_arc.clone();
    let pb2_arc = pb_arc.clone();
    let pb3_arc = pb_arc.clone();

    let thread1 = thread::spawn(move || {
        if run_application(&mut pb1_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
            pb1_arc.lock().unwrap().write("thread1 started").unwrap();
            for _ in 0..50 {
                thread::sleep(Duration::from_secs_f32(0.1));
                pb1_arc.lock().unwrap().update(1).unwrap();
            }
        }
    });

    let thread2 = thread::spawn(move || {
        if run_application(&mut pb2_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
            pb2_arc.lock().unwrap().write("thread2 started").unwrap();
            for _ in 0..50 {
                thread::sleep(Duration::from_secs_f32(0.1));
                pb2_arc.lock().unwrap().update(1).unwrap();
            }
        }
    });

    let thread3 = thread::spawn(move || {
        if run_application(&mut pb3_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
            pb3_arc.lock().unwrap().write("thread3 started").unwrap();
            for _ in 0..50 {
                thread::sleep(Duration::from_secs_f32(0.1));
                pb3_arc.lock().unwrap().update(1).unwrap();
            }
        }
    });

    // join other worker threads
    for thread in [thread1, thread2, thread3] {
        thread.join().unwrap();
    }

    eprint!("{}", "\n".repeat(3));
    println!("completed!");

    Ok(())
}

ACTUAL VERSION

use kdam::{term, tqdm, BarExt};
use std::{thread, time::Duration};

fn run_application(progression: &mut kdam::Bar, application: &str, parametres: &[&str]) -> Result<(), ()> {
	let mut command = std::process::Command::new(application);
	if let Ok(résultat) = command.args(parametres).output() {
		let final_text = unsafe {
			std::str::from_utf8_unchecked(&résultat.stderr)
		};
        // make use of final_text
        progression.write("application started").unwrap();
		Ok(())
	} else {
        progression.write("application failed to start").unwrap();
		Err(())
	}
}

fn main() -> Result<(), ()> {
    term::init(false);

    let pb1 = tqdm!(total = 150, position = 0);
    let pb_arc = std::sync::Arc::new(std::sync::Mutex::new(pb1));
    let pb1_arc = pb_arc.clone();
    let pb2_arc = pb_arc.clone();
    let pb3_arc = pb_arc.clone();

    let thread1 = thread::spawn(move || {
        if run_application(&mut pb1_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
            pb1_arc.lock().unwrap().write("thread1 started").unwrap();
            for _ in 0..50 {
                thread::sleep(Duration::from_secs_f32(0.1));
                pb1_arc.lock().unwrap().update(1).unwrap();
            }
        }
    });

    let thread2 = thread::spawn(move || {
        if run_application(&mut pb2_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
            pb2_arc.lock().unwrap().write("thread2 started").unwrap();
            for _ in 0..50 {
                thread::sleep(Duration::from_secs_f32(0.1));
                pb2_arc.lock().unwrap().update(1).unwrap();
            }
        }
    });

    let thread3 = thread::spawn(move || {
        if run_application(&mut pb3_arc.lock().unwrap(), "sleep", &["5"]).is_ok() {
            pb3_arc.lock().unwrap().write("thread3 started").unwrap();
            for _ in 0..50 {
                thread::sleep(Duration::from_secs_f32(0.1));
                pb3_arc.lock().unwrap().update(1).unwrap();
            }
        }
    });

    // join other worker threads
    for thread in [thread1, thread2, thread3] {
        thread.join().unwrap();
    }

    eprint!("{}", "\n".repeat(3));
    println!("completed!");

    Ok(())
}

The problem is that you have one mutex shared among 3 tasks. And it is getting locked for the entire duration of the call to run_application().

When the code

run_application(&mut pb3_arc.lock().unwrap(), ...)

runs, the Mutex is locked and the contained value is passed in to run_application(). The requires that the mutex lock be held until run_application() returns, or else the mutex would allow race conditions.

You might be able to solve it by passing the Mutex itself in to the run_application() function, and then just locking the Mutex for short periods while doing something specific inside it.

3 Likes

Ah, fantastic! I replaced the parameter:

progression: &mut kdam::Bar

with:

progression_arc: std::sync::Arc<std::sync::Mutex<kdam::Bar>>

and everything is working now. See WORKING CODE below. Thank you very much! One final question: I am ashamed to admit that I came up with Arc<Mutex> by trial and error, in an attempt to make all threads share the same progress bar. I wonder if it's the best approach. I noticed that you refer to Mutex and not Arc<Mutex>. Is this intentional? Are you hinting that Arc can be omitted, and if yes, how?

WORKING CODE

use kdam::{term, tqdm, BarExt};
use std::{thread, time::Duration};

fn run_application(progression_arc: std::sync::Arc<std::sync::Mutex<kdam::Bar>>, application: &str, parametres: &[&str]) -> Result<(), ()> {
	let mut command = std::process::Command::new(application);
	if let Ok(résultat) = command.args(parametres).output() {
		let final_text = unsafe {
			std::str::from_utf8_unchecked(&résultat.stderr)
		};
        // make use of final_text
        progression_arc.lock().unwrap().write("application started").unwrap();
		Ok(())
	} else {
        progression_arc.lock().unwrap().write("application failed to start").unwrap();
		Err(())
	}
}

fn main() -> Result<(), ()> {
    term::init(false);

    let pb1 = tqdm!(total = 150, position = 0);
    let pb_arc = std::sync::Arc::new(std::sync::Mutex::new(pb1));
    let pb1_arc = pb_arc.clone();
    let pb2_arc = pb_arc.clone();
    let pb3_arc = pb_arc.clone();

    let thread1 = thread::spawn(move || {
        if run_application(pb1_arc.clone(), "sleep", &["5"]).is_ok() {
            pb1_arc.lock().unwrap().write("thread1 started").unwrap();
            for _ in 0..50 {
                thread::sleep(Duration::from_secs_f32(0.1));
                pb1_arc.lock().unwrap().update(1).unwrap();
            }
        }
    });

    let thread2 = thread::spawn(move || {
        if run_application(pb2_arc.clone(), "sleep", &["5"]).is_ok() {
            pb2_arc.lock().unwrap().write("thread2 started").unwrap();
            for _ in 0..50 {
                thread::sleep(Duration::from_secs_f32(0.1));
                pb2_arc.lock().unwrap().update(1).unwrap();
            }
        }
    });

    let thread3 = thread::spawn(move || {
        if run_application(pb3_arc.clone(), "sleep", &["5"]).is_ok() {
            pb3_arc.lock().unwrap().write("thread3 started").unwrap();
            for _ in 0..50 {
                thread::sleep(Duration::from_secs_f32(0.1));
                pb3_arc.lock().unwrap().update(1).unwrap();
            }
        }
    });

    // join other worker threads
    for thread in [thread1, thread2, thread3] {
        thread.join().unwrap();
    }

    eprint!("{}", "\n".repeat(3));
    println!("completed!");

    Ok(())
}
1 Like

You're right, you have to pass the Mutex inside an Arc to share it. I was taking a mental shortcut around that aspect for a moment :slight_smile:

1 Like

Ah I see... I honestly wasn't sure. Thank you very much again! Topic solved.

Using scope you can do away with Arc.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.