Using Arc<()> to terminate a thread

I sometimes come across the problem of needing to terminate threads. I could use channels to signal threads to terminate. But I think I found an easier solution.

I use an Arc<()> which I clone. One of those Arcs stays in the controlling thread, and one of the Arcs goes to the child thread.

When I want to terminate the child, I simply drop the Arc. In the child thread, I check whether the Arc's strong_count is greater than 2. If it's not, I terminate the child thread gracefully.

This is how it looks like:

use std::sync::Arc;
use std::thread::{sleep, spawn};
use std::time::Duration;

fn main() {
    let keepalive_send = Arc::new(());
    let keepalive_recv = keepalive_send.clone();
    let join_handle = spawn(move || {
        let mut counter = 0;
        while Arc::strong_count(&keepalive_recv) > 1 {
            // do something here:
            counter += 1;
            println!("running ({counter})...");
            sleep(Duration::from_millis(150));
        }
        counter
    });
    // do something here:
    sleep(Duration::from_millis(1000));
    // terminate spawned thread:
    drop(keepalive_send);
    let result = join_handle.join().unwrap();
    println!("Result: {result}");
}

(Playground)

I wonder: Is this a good approach to the problem? Or is there any easier or more idiomatic way?

(I think it should be pretty efficient. The strong_count method just performs an atomic read with Acquire ordering.)

I considered to use an AtomicBool instead, but then I would need to implement a mechanism which ensures that the child gets notified to terminate if the controlling thread panics. By using the Arc, this is already automatically happening because as soon as I drop the keepalive_send, the child will terminate in the next loop iteration.

4 Likes

I think it's a reasonable solution, but it would be a bit more idiomatic/semantically correct to leave the only copy of the strong Arc in the parent thread, and only leave a weak reference in the child. Then you could do

while keepalive_recv.upgrade().is_some() {
    ...
}

which IMO signals the intention of single owneship better.

I would also recommend newtyping both halves, so that you can't accidentally (or intentionally) clone either half, and there is always truly a single owner only. I'd do that at least for the receiving end so that the child can't keep itself alive.

Actually, it's pretty easy to incorporate this into a single newtype and a function hiding the implementation details of the keepalive-checking loop: Playground

use core::time::Duration;
use core::ops::Deref;
use std::sync::Arc;
use std::thread::{self, JoinHandle};

#[derive(Debug)]
pub struct KeepaliveHandle {
    keepalive: Arc<()>,
    join: JoinHandle<()>,
}

impl Deref for KeepaliveHandle {
    type Target = JoinHandle<()>;
    
    fn deref(&self) -> &Self::Target {
        &self.join
    }
}

pub fn spawn_keepalive<F: FnMut() + Send + 'static>(mut worker: F) -> KeepaliveHandle {
    let keepalive = Arc::default();
    let recv = Arc::downgrade(&keepalive);
    
    let join = thread::spawn(move || {
        while recv.upgrade().is_some() {
            worker();
        }
    });
    
    KeepaliveHandle { keepalive, join }
}

fn main() {
    let mut counter = 0;
    let handle = spawn_keepalive(move || {
        println!("working: {}", counter);
        counter += 1;
        thread::sleep(Duration::from_secs(1));
    });
    
    println!("Spawned thread");
    thread::sleep(Duration::from_secs(5));
    drop(handle);
    println!("Finished");
}
7 Likes

Did you consider using

2 Likes

While semantically elegant, this repeatedly increments and decrements the strong count, rather than only reading it. How about keepalive_recv.strong_count() > 0?

1 Like

This is explicitly pointed out in the OP along with why it doesn't work:

Indeed, but I doubt it would not be dominated by anything worthwhile to outsource to a separate worker thread.

Ah, I should have read more carefully to the end.

My overall feeling is this is doesn't seem like an elegant design, but without knowing more about the total requirement, it is a bit hard to say.

Destructors still run when there is a panic, so I think a struct which resets an inner atomic bool in Drop::drop should do the job?

2 Likes

A better solution would be to use a channel of ()'s. Perhaps something like this:

let (sender, receiver) = std::sync::mpsc::channel();

std::thread::spawn(move || {
  expensive_background_computation();
  sender.send(()); // indicate that the task has completed successfully
});

match receiver.recv() {
  Ok(_) => /* the thread exited normally */,
  Err(_) => {
    // all senders have been dropped, meaning the background 
    // thread exited abnormally.
  }
}

The benefit of this approach is you are reusing std::sync::mpsc's machinery for parking threads and waking them up again, meaning you are notified immediately (a 150ms sleep in your Arc::strong_count() loop means you'll have 75ms lag on average) and you avoid the inelegant busy loop.

This is a pretty common pattern in Go for notifying other goroutines when something happens.

5 Likes

Thank you for all the responses so far.

I tried to make things more "idiomatic", and came up with this:

use std::thread::{sleep, spawn};
use std::time::Duration;

pub mod keepalive {
    use std::sync::{Arc, Weak};

    pub struct Sender(Arc<()>);

    #[derive(Clone)]
    pub struct Receiver(Weak<()>);
    
    pub fn channel() -> (Sender, Receiver) {
        let arc = Arc::new(());
        let weak = Arc::downgrade(&arc);
        (Sender(arc), Receiver(weak))
    }
    
    impl Receiver {
        pub fn is_alive(&self) -> bool {
            Weak::strong_count(&self.0) > 0
        }
    }
}

fn main() {
    let (keepalive_send, keepalive_recv) = keepalive::channel();
    let join_handle = spawn(move || {
        let mut counter = 0;
        while keepalive_recv.is_alive() {
            // do something here:
            counter += 1;
            println!("running ({counter})...");
            sleep(Duration::from_millis(150));
        }
        counter
    });
    // do something here:
    sleep(Duration::from_millis(1000));
    // terminate spawned thread:
    drop(keepalive_send);
    let result = join_handle.join().unwrap();
    println!("Result: {result}");
}

(Playground)

Here, the child thread cannot "accidentally" keep itself alive, yet it's possible to clone the receiver if desired. But it's not possible to clone the sender.

I'm not sure if that makes things really better (unless it was hidden in some crate, or perhaps in future be part of std or tokio::sync in this or a similar form). What I liked about Arc<()> is that it's so simple, but I also feel like it's a bit confusing / abusive.

I guess what would better is a Relaxed ordering, which would be even more efficient, right? There is Arc::try_unwrap, which does a Relaxed(!) compare_exchange in the error case (and will only acquire if being successful). Maybe that is faster?

Let's try it without hiding the details, just simply using Arc:

use std::sync::Arc;
use std::thread::{sleep, spawn};
use std::time::Duration;

fn main() {
    let keepalive_send = Arc::new(());
    let mut keepalive_recv = keepalive_send.clone();
    let join_handle = spawn(move || {
        let mut counter = 0;
        loop {
            match Arc::try_unwrap(keepalive_recv) {
                Ok(()) => break,
                Err(err) => keepalive_recv = err,
            }
            // do something here:
            counter += 1;
            println!("running ({counter})...");
            sleep(Duration::from_millis(150));
        }
        counter
    });
    // do something here:
    sleep(Duration::from_millis(1000));
    // terminate spawned thread:
    drop(keepalive_send);
    let result = join_handle.join().unwrap();
    println!("Result: {result}");
}

(Playground)

@Michael-F-Bryan: I don't see how I can "interrupt" the expensive_background_computation in your example.

I feel like none of the solutions presented so far get to the simplicity of the original idea. However, I understand that wrapping everything in a newtype pattern could make semantics more clear.

You can check whether the sender's thread has panicked by calling Receiver::try_recv() and checking the return value to see if the sender completed normally (Ok(())), it's still running (TryRecvError::Empty), or the sender's thread panicked (TryRecvError::Disconnected).

I normally find solutions that involve blindly polling a condition and sleeping for an arbitrary amount of time to be inelegant.

Sure, it's trivial to implement, but it comes across as naive/hacky because you're essentially guessing how long to sleep for instead of notifying the other side immediately like something involving Drop would (i.e. channels). Polling also seems inefficient considering we already have proper mechanisms for putting a thread to sleep until a certain condition is satisified (Condvar, which most channel implementations use under the hood).

I'm not sure if I understand completely, but I think you might have misunderstood my example. The sleep is just for demonstration purposes. My real-world code doesn't sleep but it waits for I/O (from a library that doesn't support async, so it will block the thread). It's nothing I can work around, I think (soapysdr::RxStream::read).

2 Likes

Ah okay, I was seeing the sleep inside your while Arc::strong_count(&keepalive_recv) > 1 loop and it looks like you're just polling until the condition is satisfied and threw in a thread::sleep() to avoid pegging your CPU at 100%.

As others have mentioned I would argue that Arc<AtomicBool> is probably a better signal, most notably, it can be used to signal multiple threads for termination in one fell swoop, rather than requiring one Arc<()> per child thread.

The only drawback is having to write a Drop implementation so that the "owner" thread automatically signals to children threads to terminate in case of panic. That's easy enough -- it's just setting the boolean to true, after all.

1 Like

The case of signaling multiple threads (i.e. having multiple receivers) has already been solved by using Weak for the receiver in this Playground of my post here. I don't need an AtomicBool for it, I believe.

However, I could use the AtomicBool to achieve Relaxed ordering and avoiding the compare_exchange of Arc::try_unwrap (source) which I previously considered in that other Playground in the same post above.

This would lead to a very efficient is_alive method, I think:

pub mod keepalive {
    use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
    use std::sync::Arc;

    #[derive(Debug)]
    pub struct Sender(Arc<AtomicBool>);

    #[derive(Clone, Debug)]
    pub struct Receiver(Arc<AtomicBool>);
    
    pub fn channel() -> (Sender, Receiver) {
        let arc1 = Arc::new(AtomicBool::new(true));
        let arc2 = arc1.clone();
        (Sender(arc1), Receiver(arc2))
    }
    
    impl Drop for Sender {
        fn drop(&mut self) {
            self.0.store(false, Relaxed);
        }
    }
    
    impl Receiver {
        pub fn is_alive(&self) -> bool {
            self.0.load(Relaxed)
        }
    }
}

(Playground)

Using Weak for the receiver in combination with accessing an AtomicBool behind it is a bad idea, I think, because this would require repeatedly incrementing and decrementing the strong count, as in the case @kpreid commented on before.

1 Like

Agree. I was thinking of channels in Go when noticing this post.