Heartbeat in a thread, done right

Hello experts,

I'm trying to figure out how to write a simple application which has a thread sending a heartbeat signal to some service. When going out of scope, I would like to tell the thread that it can now stop and terminate. I was planning to do this via channels.

So, why am I writing here? Basically I worked my way through the entire rust book and believe that I understood most of it, though of course I'm lacking some practice. I would like to show the code I have and get some feedback from experts as to what to change to make it a) compile :smiley: and b) correct in terms of architecture.

So here is my code. I basically implemented a struct called Node which some other code can make an instance of. This struct will connect to a Rest Service and in its background, it should make a heartbeat call every few seconds. The Node has a startup() function to start the heartbeat and a shutdown() function to stop it. I stripped my example code down to the minimum required to get the pattern explained.

pub struct Node {
    heartbeat_handle: Option<thread::JoinHandle<()>>,
    tx: mpsc::Sender<()>,
    rx: mpsc::Receiver<()>
}

impl Node {
    pub fn new() -> Node {
        let (tx, rx) = mpsc::channel();
        Node {
            heartbeat_handle: None,
            tx: tx,
            rx: rx
        }
    }

    pub fn startup(&mut self) {
        self.heartbeat_handle = Some(thread::spawn(move || {Node::heartbeat(self.rx)}));
    }

    pub fn shutdown(&mut self) {
        self.tx.send(());
    }

    fn heartbeat(rx: mpsc::Receiver<()>) {
        loop {
            println!("heart ... beat ...");

            match rx.try_recv() {
                Ok(_) | Err(mpsc::TryRecvError::Disconnected) => {
                    println!("Terminating.");
                    break;
                },
                Err(mpsc::TryRecvError::Empty) => {}
            }

            thread::sleep(time::Duration::from_secs(10))
        }
    }
}

That's the compressed version of my code. The error I get is:

error[E0477]: the type `[closure@src/node.rs:131:52: 131:86 self:&mut node::Node]` does not fulfill the required lifetime
   --> src/node.rs:131:38
    |
131 |         self.heartbeat_handle = Some(thread::spawn(move || {Node::heartbeat(self.rx)}));
    |                                      ^^^^^^^^^^^^^
    |
    = note: type must satisfy the static lifetime

I have to admit that the part about lifetimes seems most complicated to me, but I really cant figure out how to get this working and if it is architecturally good if I would get it working somehow.

Any help would be really appriciated! Thanks! :slight_smile:
Christian

The closure you pass to thread::spawn must not have any references (except for 'static), which is what the compiler is complaining about. The code is referencing the rx field - the move doesn't do anything because you cannot move that field out of the Node.

The easiest, although somewhat ugly, fix is to put the rx field in an Option, and then take the channel out in startup:

pub struct Node {
    heartbeat_handle: Option<thread::JoinHandle<()>>,
    tx: mpsc::Sender<()>,
    rx: Option<mpsc::Receiver<()>>,
}

impl Node {
    pub fn new() -> Node {
        let (tx, rx) = mpsc::channel();
        Node {
            heartbeat_handle: None,
            tx: tx,
            rx: Some(rx),
        }
    }

    pub fn startup(&mut self) {
        let rx = self.rx.take().unwrap();
        self.heartbeat_handle = Some(thread::spawn(|| Self::heartbeat(rx)));
    }
 ...

This is ugly because the rx field is "transient" storage for something you intend to move out of there after the Node is constructed. I assume you don't want to incorporate startup straight into new because either it's not ready to start heartbeating or a created Node may not have its startup called again (i.e. something cancels this workflow)?

A better design would be some arrangement where you setup the rx/tx channels and move the rx into the background thread right there and then - no need to store the Receiver temporarily to pass it to the thread later on. In other words, you want a one-step startup/bootstrap, and not a two-step init (first new, then startup).

1 Like

Thank you very much for the detailed answer. I took your code and modified it slightly. What do you think about this. Would you consider it ugly?

pub struct Node {
    heartbeat_handle: Option<thread::JoinHandle<()>>,
    tx: Option<mpsc::Sender<()>>,
}

impl Node {
    pub fn new() -> Node {
        Node {
            heartbeat_handle: None,
            tx: None
        }
    }

    pub fn startup(&mut self) {
        let (tx, rx) = mpsc::channel();
        self.tx = Some(tx);
        self.heartbeat_handle = Some(thread::spawn(move || {Node::heartbeat(rx)}));
    }

    pub fn shutdown(&mut self) {
        if let Some(ref tx) =  self.tx {
            tx.send(());
        }
    }

    fn heartbeat(rx: mpsc::Receiver<()>) {
        loop {
            println!("heart ... beat ...");

            match rx.try_recv() {
                Ok(_) | Err(mpsc::TryRecvError::Disconnected) => {
                    println!("Terminating.");
                    break;
                },
                Err(mpsc::TryRecvError::Empty) => {}
            }

            thread::sleep(time::Duration::from_secs(10))
        }
    }
}

I removed the rx part from the node struct and set the tx as option, where I only set an actual value inside startup, which is when the channel gets created.

what do you think?
(it compiles and works :slight_smile: )

1 Like

I think that's better :slight_smile:. But let me ask you - what's the reason for not setting it up in the new function so you don't need the Option wrappers? That is, why is startup a separate method?

A couple of other random points/suggestions:

  1. You should implement Drop for Node which calls shutdown. In fact, if shutdown is terminal (i.e. you don't startup again), you can probably get rid of the shutdown and just rely on dropping the Node to issue the shutdown message.
  2. Is there a reason you have thread::sleep in the heartbeat loop? Why not use the (blocking) recv method?
1 Like

I've had to solve a similar problem and found a slightly different way of doing things which works quite well. In this case, running an expensive task in a background thread (don't want to block the GUI) and passing the result back to the main thread when it finishes.

I created a TaskHandle which contains the sender side of a channel and a closure which it will invoke repeatedly. Every time the closure returns the TaskHandle will send back a Message::Heartbeat message to the monitoring process. I then implemented Drop on TaskHandle and made it send one final Message::Quit before it gets dropped (which also closes that Sender, by the way).

(playground)

use std::thread;
use std::time::Duration;
use std::sync::mpsc::{self, Sender};

#[derive(Debug, Copy, Clone)]
enum Message {
    Heartbeat,
    Quit,
}

fn monitor_task<F: FnMut() + Send + 'static>(func: F) {
    let (tx, rx) = mpsc::channel();

    let task = TaskHandle::new(func, tx);
    thread::spawn(move || task.run());

    for message in rx {
        println!("Received {:?}", message);
        
    }
}

struct TaskHandle<F> {
    chan: Sender<Message>,
    func: F,
}

impl<F: FnMut() + Send + 'static> TaskHandle<F> {
    fn new(func: F, chan: Sender<Message>) -> TaskHandle<F> {
        TaskHandle { func, chan }
    }

    fn run(mut self) {
        for _ in 0..5 {
            (self.func)();
            self.chan.send(Message::Heartbeat).unwrap();
        }
    }
}

impl<F> Drop for TaskHandle<F> {
    fn drop(&mut self) {
        let _ = self.chan.send(Message::Quit);
    }
}

fn main() {
    monitor_task(|| thread::sleep(Duration::from_millis(100)));
}

This actually feels like a great place to use futures. Your thread could continuously call some_future.poll(), sending back a heartbeat if it's not finished, or the result (or an error) once the future completes.

1 Like

Hm, I needed to seperate new() and startup() because there is some more stuff that can be done without actually starting the connection. But maybe this gets merged when we refactor the architecture a bit. I'm trying to keep it as simple as possible.

In fact I already implemented the drop trait, so: good you mentioned this. I'm on the right track :slight_smile:

The sleep is needed because I want the heartbeat towards the server only to happen every 10sec or so. If I took a blocking recv, the loop would block until the exit signal is received, which means there is no more heartbeat. Or do you mean the rx.recv_timeout method, which would wait for a certain amount of time? This is also a good idea.

1 Like

That is also a good idea. I will definately look into using futures, maybe that's a nice alternative! Thank you!

Yes sorry, I meant recv_timeout - no idea why I wrote recv.

To solve a similar problem, I used an Arc<AtomicBool> instead of the channel. I think it's more efficient, but I'm not sure

1 Like

I'm using recv_timeout now, this is a lot better, because the thread will be stopped instantly upon receiving the signal through the channel. In my previous code it was waiting for the thread.sleep. Thanks a lot!

1 Like