Joining two threads, exit early in case of `Err`

Hi there!

I have a program where I std::thread::spawn two threads which both return a Result<..., ...>. They are wrapped in a std::thread::scope function.

What I'd like to do is to run them in parallel and then get the results, but exit early if one of the threads fail.

This means that if thread 1 is doing its thing but thread 2 fails early, the scope must return immediatly with the error. Same thing if thread 2 works but thread 1 fails. Otherwise, both threads must be waited for.

Here is the code I currently have:

let (a, b) = std::thread::scope(|s| -> Result<(..., ...), ...> {
  let a = s.spawn(|| do_something());
  let b = s.spawn(|| do_something());

  Ok((a.join().unwrap()?, b.join().unwrap()?)
});

Where the closure in spawn returns a Result<..., ...>.

Any idea how to do this? Do I have to use an async runtime for that?

Thanks in advance for your help!

I think threads generally can't be aborted unless you have some sort of "exit point". You could regularly check an atomic variable or queue to exit when the other thread panics (which you could catch with catch_unwind and set that atomic variable / send an exit notification to the other thread).

But not really sure if that works well.

1 Like

I don't actually need to abort the other thread, I just need to get the result back. Also I don't want to directly check for thread panics but when the function called inside spawn(|| ...) returns an Err.

In that case you could use catch_unwind in both threads and notify the main thread which of the child threads ended (with Ok, Err, or through panicking). The main thread could then join that thread (or just use the result sent through an mpsc. I'll try to write an example.

Note that std::thread::scope can never exit before all threads spawned inside it complete. If it did you wouldn't be able to borrow non 'static data on the spawned threads.

2 Likes

In principle something like that:

use std::sync::mpsc;
use std::thread;

#[derive(Debug)]
enum Either {
    ThreadA(Result<i32, ()>),
    ThreadB(Result<i32, ()>),
}

fn main() {
    thread::scope(|s| {
        let (tx1, rx) = mpsc::sync_channel(1);
        let tx2 = tx1.clone();
        s.spawn(move || {
            tx1.send(Either::ThreadA(Ok(5))).ok();
        });
        s.spawn(move || {
            tx2.send(Either::ThreadB(Err(()))).ok();
        });
        println!("Got: {:?}", rx.recv());
    });
}

(Playground)

This doesn't handle panics and !Send errors (and it may block one thread for an infinite time, if you have more than two threads and the mpsc has just a capacity of only 1), but maybe this idea helps you.

2 Likes

There's no way to automate that when using regular threads. There's no safe+reliable way to make a thread exit (things like pthread_cancel will cause leaks, poison locks, etc.). Threads can exit properly only if their code voluntarily and naturally stops (i.e. if you yourself write code by hand that returns from the thread or triggers an unwind, and Rust can't inject such code for you).

One way to do that is to have a flag:

let stopped = AtomicBool::new(false);

and in your thread's code keep checking if stopped is true at every occasion you can, and when it's true then return or panic. And in the outer code of course set that to true when one of the threads reported an error. If the threads are blocked waiting for something, then they won't be able to read the atomic flag and react to it, so you will need to you need to write a special-case solution to break that wait.

Alternatively, if the threads are waiting for I/O, then you should use async/await. Futures are very easy to abort at any .await point. The basic try_join! macro does exactly what you want.

4 Likes

I think that is indeed the problem, I tried using Rayon as it doesn't always use multiple threads, and wondered why the iterator didn't return when I had the first Err when collecting to a Result<..., ...>. And it seems like it's just because the thread can't be brutally stopped, which seems logical when I think about it.

So I'll try to pass an AtomicBool to stop the process as the long function I run in parallel is about iterating on a really huge loop.

Thanks for your help!

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.