This is still unreasonably hard to do. Any progress in the last 3 years?
You can close the sender end(s) to force an EOF, and that happens implicitly on a Drop. The idea is to abort waits from a drop, have the thread exit, join to the thread in the Drop, clean up and exit the thread.
But if there are threads with channels in both directions, no order of drops will work. There's a deadlock.
You can't kick a .recv() out of a wait if you only have the Receiver. And if you have a clone of the Sender,
the channel won't close until all the clones have been dropped. So shutting things down from the receiver end is very tough.
What would help is an "abort" function for a Receiver, one which would both get any thread waiting on "recv()" an error return. Are there any channel crates which implement that?
I don't quite understand what you mean by bidirectional pipe? Theoretically, a thread always belongs to another thread, that is to say, the thread also has ownership, and only the thread with ownership can close the child thread. A bidirectional pipe sounds like a circular reference
Regardless of whether it is an asynchronous function or not, the concept is similar. If your threads are computational threads, it might be better to use a mechanism like fork-join. This is similar to how cuda is used
You do not explain exactly what you need to do before the program terminates.
In my case, I have a process that asynchronously writes data to the database (not with async keyword, just a regular thread ) and I wait for it to complete ( using a mutex which is locked when it is working ). However other threads which may be running can just be terminated by program exiting.
Failing that, a global "everybody shut down" flag will work. Any standard way to do that?
Sure, an Arc<AtomicBool>. Pass it to your worker threads, make them check whether it's true and exit if it is. When the manager thread wants to shut down, it sets it to true then joins its worker threads.
Make sure your workers are checking it periodically, such as something along the lines of every iteration of their outer loop. Or, if your outer loop is really right, you can do something like make a counter so that they check it every 100 loop iterations to avoid being bogged down by an atomic read--tokio does something similar where a worker thread only checks the global overflow queue every n iterations.
If your worker threads ever block on something, make sure initiating shut down unblocks them. For example, if they read then process messages from the main thread through a concurrent queue structure, make sure the main thread upon initiating shut down drops the sending half of that queue after setting the shut-down bool and before joining them--that way, if the worker threads were bored and thus blocking on the queue, the dropping of the sender half will wake them up, then they'll notice that the bool is set to true and/or notice that the message queue was dropped and exit.
There was also this article that was posted a few months ago. It's specific to Linux, but provides a lay of the land and helps explain why this still hasn't been generally solved: How to stop Linux threads cleanly (mazzo.li)[1].
Windows has TerminateThread but it has a huge caveat that basically boils down to "Use this only as a last resort. And if you do use it, it can leave application state undefined." Roughly equivalent to pthread_kill(thread_id, SIGKILL). ↩︎
Yes. After six years of discussion, there's a PR for crossbeam-channel to implement .disconnect(). That will fix the problem.
Then, standard procedure to close out a thread should be to do .disconnect() on all incoming channels, to unblock them, let the thread detect the error and exit, and then the parent thread can do a join.
Why OnceLock? AtomicBool has a constant constructor, so if you want to store it in a static you can just, do that
use std::sync::atomic::AtomicBool;
static SHUT_DOWN: AtomicBool = AtomicBool::new(false);
fn main() {
let a: &'static AtomicBool = &SHUT_DOWN;
}
If you want to create one dynamically you can use Box::leak
use std::sync::atomic::AtomicBool;
fn main() {
let a: &'static AtomicBool = Box::leak(Box::new(AtomicBool::new(false)));
}
But I generally think it's nice to be able to create and destroy resources dynamically (including a worker thread pool) which is why I'd default to Arc unless you have a strong motivation otherwise.