Multi-thread shutdown, best practices, 3 years later

Previous discussion from 2021

This is still unreasonably hard to do. Any progress in the last 3 years?

You can close the sender end(s) to force an EOF, and that happens implicitly on a Drop. The idea is to abort waits from a drop, have the thread exit, join to the thread in the Drop, clean up and exit the thread.

But if there are threads with channels in both directions, no order of drops will work. There's a deadlock.
You can't kick a .recv() out of a wait if you only have the Receiver. And if you have a clone of the Sender,
the channel won't close until all the clones have been dropped. So shutting things down from the receiver end is very tough.

What would help is an "abort" function for a Receiver, one which would both get any thread waiting on "recv()" an error return. Are there any channel crates which implement that?

why don't using CancellationToken in tokio_util::sync - Rust (docs.rs).

I don't quite understand what you mean by bidirectional pipe? Theoretically, a thread always belongs to another thread, that is to say, the thread also has ownership, and only the thread with ownership can close the child thread. A bidirectional pipe sounds like a circular reference

That involves Tokio and is an async function. Not using async, because this is a compute-bound system with multiple threads at different priorities.

Regardless of whether it is an asynchronous function or not, the concept is similar. If your threads are computational threads, it might be better to use a mechanism like fork-join. This is similar to how cuda is used

1 Like

You do not explain exactly what you need to do before the program terminates.

In my case, I have a process that asynchronously writes data to the database (not with async keyword, just a regular thread ) and I wait for it to complete ( using a mutex which is locked when it is working ). However other threads which may be running can just be terminated by program exiting.

So what is your specific problem?

Failing that, a global "everybody shut down" flag will work. Any standard way to do that?

Sure, an Arc<AtomicBool>. Pass it to your worker threads, make them check whether it's true and exit if it is. When the manager thread wants to shut down, it sets it to true then joins its worker threads.

Make sure your workers are checking it periodically, such as something along the lines of every iteration of their outer loop. Or, if your outer loop is really right, you can do something like make a counter so that they check it every 100 loop iterations to avoid being bogged down by an atomic read--tokio does something similar where a worker thread only checks the global overflow queue every n iterations.

If your worker threads ever block on something, make sure initiating shut down unblocks them. For example, if they read then process messages from the main thread through a concurrent queue structure, make sure the main thread upon initiating shut down drops the sending half of that queue after setting the shut-down bool and before joining them--that way, if the worker threads were bored and thus blocking on the queue, the dropping of the sender half will wake them up, then they'll notice that the bool is set to true and/or notice that the message queue was dropped and exit.

2 Likes

Or &'static AtomicBool with e.g. OnceLock.

There was also this article that was posted a few months ago. It's specific to Linux, but provides a lay of the land and helps explain why this still hasn't been generally solved: How to stop Linux threads cleanly (mazzo.li) [1].


  1. Windows has TerminateThread but it has a huge caveat that basically boils down to "Use this only as a last resort. And if you do use it, it can leave application state undefined." Roughly equivalent to pthread_kill(thread_id, SIGKILL). ↩︎

3 Likes

It sounds like this is a matter of implementing a shutdown() API on the channel.

  • futures-channel has it, but it only has async APIs
  • tokio-channel doesn't have it but shows that async and blocking APIs can coexist on a channel
  • flume has a WeakSender mechanism at least which can be useful if one stash 1 master send and let all the others be weak

The alternative is indeed using recv_timeout in a loop and checking a shared flag when it wakes up.

Yes. After six years of discussion, there's a PR for crossbeam-channel to implement .disconnect(). That will fix the problem.

Then, standard procedure to close out a thread should be to do .disconnect() on all incoming channels, to unblock them, let the thread detect the error and exit, and then the parent thread can do a join.

1 Like

Why OnceLock? AtomicBool has a constant constructor, so if you want to store it in a static you can just, do that

use std::sync::atomic::AtomicBool;

static SHUT_DOWN: AtomicBool = AtomicBool::new(false);

fn main() {
    let a: &'static AtomicBool = &SHUT_DOWN;    
}

If you want to create one dynamically you can use Box::leak

use std::sync::atomic::AtomicBool;

fn main() {
    let a: &'static AtomicBool = Box::leak(Box::new(AtomicBool::new(false)));
}

But I generally think it's nice to be able to create and destroy resources dynamically (including a worker thread pool) which is why I'd default to Arc unless you have a strong motivation otherwise.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.