Channels with way to stop?

I want to run one thread and send Box<FnOnce()> for execution.
Background thread will peek up one by one Box<FnOnce()> and execute them.

Sounds like std::sync::mpsc or crossbeam,
but I have no idea how to stop them gracefully.

If user want to quit my program I want background thread to process only current job and then exit.
So I need clear operation for channel to clear it and then send "quit" message.
Or I need some kind of priority for messages, so I just send "quit" message with the highest priority and background thread peek up it first.

Is any crate for this?

The easiest thing in this case (I think) would be to have another channel—or perhaps an atomic somewhere—that tells the worker when to stop. Before checking the work queue, the worker must check the status channel/atomic to see if it should stop.

use ::std::*;

fn main ()
{
    type Task<'a> = Box<dyn FnOnce() + Send + 'a>;
    
    let (task_tx, task_rx) = ::crossbeam::channel::unbounded::<Task>();
    
    let (quit_tx, quit_rx) = ::crossbeam::channel::bounded::<()>(1);
    
    ::crossbeam::thread::scope(|scope| {
        // Slave
        scope.spawn(|_| {
            let thread_id = thread::current().id();
            println!("<{:?}> Slave started", thread_id);
            loop {
                use ::crossbeam::channel::TryRecvError;
                match quit_rx.try_recv() {
                    | Ok(()) => {
                        println!("<{:?}> Received QUIT message", thread_id);
                        break;
                    },
                    | Err(TryRecvError::Empty) => {},
                    | Err(err) => {
                        panic!("quit_tx dropped: {}", err);
                    },
                }
                match task_rx.try_recv() {
                    | Ok(task) => {
                        task()
                    },
                    | Err(TryRecvError::Empty) => {
                        thread::yield_now();
                    },
                    | Err(err) => {
                        panic!("task_tx dropped: {}", err);
                    },
                }
            }
        });
<ThreadId(2)> Slave started
<ThreadId(3)> Master started
<ThreadId(3)> Sending task 0
<ThreadId(2)> Doing task 0
<ThreadId(3)> Sending task 1
<ThreadId(3)> Sending task 2
<ThreadId(2)> Doing task 1
<ThreadId(3)> Sending task 3
<ThreadId(2)> Doing task 2
<ThreadId(3)> Sending task 4
<ThreadId(3)> Sending task 5
<ThreadId(2)> Doing task 3
<ThreadId(3)> Sending task 6
<ThreadId(3)> Sending task 7
<ThreadId(2)> Doing task 4
<ThreadId(3)> Sending QUIT message
<ThreadId(3)> Sending task 8
<ThreadId(3)> Sending task 9
<ThreadId(2)> Received QUIT message
1 Like

May be I don't understand, but this looks like CPU hog, if queue is empty?

Although something like thread::park() could be better than this, provided the master thread unparks it when sending a task, thread::yield_now purpose is precisely this: to avoid busy-looping:

That will avoid taking over the scheduler, sure. But it still interferes with power management by keeping the CPU awake.

To select from one of two channels without busy waiting, use the select macro: crossbeam::channel::select - Rust.

1 Like

Shared Arc<AtomicBool> which is checked after a closure is received from the channel seems like the simplest solution here. Usual termination condition when channel is closed if there are no senders should be used to signal cancelation if the channel is empty.

An alternative solution is to add a second channel and a select!, but that seems more complex and doesn’t guarantee. The order you want. If you add a quite channel, don’t send a dummy () message, just drop a sender instead. Dropping a sender is a valid selectable way to transfer information, which is guaranteed to work even in presence of panics, and to work only once.

2 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.