Pausing and resuming worker thread

Hello! I have a worker thread that performs work every second. I currently use an AtomicBool and an mpsc channel to pause and resume it, see simplified code below. I am unsure how correct this is? I fear it could somehow get stuck in a paused state.
I cannot use park() and unpark(), as the threadhandle in the real code is being join()ed. I could possibly use a Parker instead of.a channel, but in this case it seems it would not make a difference.

Is the current solution good enough? Or are there issues I have not considered? Is it better to just check the atomicbool every loop iteration, sleeping for 1s if false?

use std::sync::{
    atomic::{AtomicBool, Ordering},
    mpsc, Arc,
};

pub struct WorkerThread {
    thread_active: Arc<AtomicBool>,
    unpause_channel: mpsc::Sender<()>,
}
impl WorkerThread {
    pub fn toggle_thread(&self, should_be_active: bool) {
        let is_currently_active = self.thread_active.load(std::sync::atomic::Ordering::SeqCst);
        if should_be_active && !is_currently_active {
            tracing::info!("Resuming worker thread");
            self.thread_active.store(true, Ordering::SeqCst);
            self.unpause_channel.send(()).ok();
        } else if !should_be_active && is_currently_active {
            tracing::info!("Pausing worker thread");
            self.thread_active.store(false, Ordering::SeqCst);
        }
    }
}

pub fn spawn_worker() -> WorkerThread {
    let thread_active_flag = Arc::new(AtomicBool::new(false));
    let (unpause_sender, unpause_receiver) = mpsc::channel::<()>();
    let thread_active_flag_clone = thread_active_flag.clone();

    std::thread::spawn(move || {
        loop {
            while !thread_active_flag.load(Ordering::SeqCst) {
                // I fear there could be a race condition round these parts
                unpause_receiver.recv().unwrap();
            }

            // do work

            std::thread::sleep(std::time::Duration::from_secs(1));
        }
    });

    WorkerThread {
        thread_active: thread_active_flag_clone,
        unpause_channel: unpause_sender,
    }
}

One problem with using AtomicBool for synchronization is that its not signallable -- you can't put an atomic bool into select.

So, if in "do work" you'll want to have a logic like "wait until that other piece of work is done OR my thread is paused", you won't be able to do that.

So usually you want to use a channel, rather than an atomic bool. Specifically, Receiver<Infallible> can be though of a signaling bool --- Infallible is an empty enum, so he only data you can get out of this receiver is that the sender was closed (dropped)

So, something like this:

struct WorkerThread {
    tokens: Sender<Receiver<Infallible>>,
    active_token: Option<Sender<Infallible>>,
    thread: JoinHandle<()>, // TODO: join on Drop
}

impl WorkerThread {
    pub fn spawn() -> WorkerThread {
        let (tokens_sender, tokens_receiver) = mpsc::channel::<Receiver<Infallible>>();
        let handle = std::thread::spawn(move || {
            for token in tokens_receiver {
                // Loop while we have a token.
                loop {
                    match token.try_recv() {
                        Ok(never) => match never {},
                        Err(TryRecvError::Empty) => {}
                        // Token is no more, to go the outer loop to refresh it.
                        Err(TryRecvError::Disconnected) => break,
                    }

                    // Pass the token onto the work for finer-grained cancellation.
                    do_work(&token);
                }
            }
        });

        WorkerThread {
            tokens: tokens_sender,
            active_token: None,
            thread: handle,
        }
    }

    pub fn toggle(&mut self, should_be_active: bool) {
        if (should_be_active && self.active_token.is_none()) {
            let (token_sender, token_receiver) = mpsc::channel::<Infallible>();
            self.active_token = Some(token_sender);
            self.tokens.send(token_receiver);
        }
        if (!should_be_active && self.active_token.is_some()) {
            self.active_token = None;
        }
    }
}

fn do_work(token: &Receiver<Infallible>) {

}

I won't deny that Sender<Receiver<Infallible>> type makes me chuckle :slight_smile:

1 Like

Note as well how clean shutdown falls out naturally here: when WorkedThread is dropped, then both active_token and tokens get closed, and the worker thread exits.

I would recommend using a watch channel. You can use a bool for the message type. Watch channels come with functionality to wait for the value to change.

2 Likes

I have been pointed to the atomic-wait crate, it seems to be exactly what I am looking for. What do you all think about this? Now the reading and the waiting is atomic

use std::sync::{
    atomic::{AtomicU32, Ordering},
    Arc,
};

pub struct WorkerThread {
    thread_active: Arc<AtomicU32>,
}
const PAUSED: u32 = 0;
const ACTIVE: u32 = 1;
impl WorkerThread {
    pub fn toggle_thread(&self, should_be_active: bool) {
        let is_currently_active =
            self.thread_active.load(std::sync::atomic::Ordering::SeqCst) == ACTIVE;

        if should_be_active && !is_currently_active {
            tracing::info!("Resuming worker thread");
            self.thread_active.store(ACTIVE, Ordering::SeqCst);
            atomic_wait::wake_all(&(*self.thread_active));

        } else if !should_be_active && is_currently_active {
            tracing::info!("Pausing worker thread");
            self.thread_active.store(PAUSED, Ordering::SeqCst);
        }
    }
}

pub fn spawn_worker() -> WorkerThread {
    let thread_active_flag = Arc::new(AtomicU32::new(PAUSED));

    {
        let thread_active_flag = thread_active_flag.clone();
        std::thread::spawn(move || {
            loop {
                atomic_wait::wait(&thread_active_flag, PAUSED); // waits while the value is PAUSED (0)

                // do work

                std::thread::sleep(std::time::Duration::from_secs(1));
            }
        });
    }

    WorkerThread {
        thread_active: thread_active_flag,
    }
}
2 Likes