Implementing a debouncer

Hi folks. I am in my third or fourth week of learning Rust (just on weekends), and so far I been going through the official Rust book. My background is primarily working in higher-level languages (Scala, Python, Java, Objective-C). My reason for learning Rust is not so much real systems programming. I work at a higher abstraction writing services and applications. However, I want a language where I can write code once and interoperate from Java and Obj-C. Rust seems to fit the bill, and it seems to prevent me from shooting myself in the face (coughs in C++).

Ask
I am seeking feedback on the code (further below) as it's written now. I'm also seeking feedback on how to complete this implementation (it is not yet complete).

Requirements
I want to write a debouncer function. The API should be something like this:

// Initializer – create the debouncer
Debouncer(val timeout_in_milliseconds) => Debouncer
// API to call a function
void execute(val callbackFunction);
// API to gracefully shutdown
void terminate();

I want the semantics to be as follows. I can call the execute function any number of times. If a subsequent call occurs within the timeout window at time t, then the debouncer should wait to execute the callbackFunction until t + timeout_in_milliseconds.

Random Thoughts
I'm really liking the message passing abstraction provided by Rust out of the box. I wrote a version of this in C++ last week, and now I'm trying to do it in Rust on top of the mpsc functionality. I'm also starting to understand the borrower, although not fully just yet.

Implementation

This implementation has the guts of a debouncer, but it's not fully there yet. The general idea is as follows –

  1. I am using the "new" pattern to construct and return an instance, i.e. Debouncer::new().
  2. The instance spawns a new thread. This thread listens for a Signal to execute some function, Signal::Execute or to gracefully shutdown, Signal::Terminate.
  3. Once Signal::Execute is received via MPSC, a separate receiver uses the recv_timeout API to block until some Timeout duration. This receiver listens for message (just an integer), which indicates another call to the debouncer was made – meaning we want to "debounce" or essentially reset the clock and then execute.
  4. The Err match for recv_timeout simply means that we timed out waiting for a subsequent call, meaning we're good to go ahead and execute whatever logic that needed to be debounced.
use std::time::{Duration, Instant};
use std::sync::mpsc;
use std::thread;

enum Signal {
    Execute,
    Terminate
}

struct Debouncer {
    timeout: Duration,
    last_invoked: Option<Instant>,
    signal_sender: mpsc::Sender<Signal>,
    debounce_sender: mpsc::Sender<u32>,
    thread: Option<thread::JoinHandle<()>>
}

impl Debouncer {
    pub fn new(timeout: Duration) -> Self {
        let (signal_sender, signal_receiver) = mpsc::channel();
        let (debounce_sender, debounce_receiver) = mpsc::channel();

        let thread = thread::spawn(move || 'outer: loop {
            let signal = signal_receiver.recv().unwrap();
            'inner: loop {
                let debounce = debounce_receiver.recv_timeout(timeout);
                match debounce {
                    Ok(_) => {
                        println!("[Debouncing]");
                    },
                    Err(_) => { 
                        match signal {
                            Signal::Execute => {
                                println!("[Executing]");
                                break 'inner;
                            },
                            Signal::Terminate => {
                                break 'outer;
                            }
                        }
                    }
                }
            }
        });
        Debouncer {
            timeout,
            last_invoked: None,
            signal_sender,
            debounce_sender,
            thread: Some(thread)
        }
    }

    pub fn execute<T>(&mut self, task: T) where T: FnOnce() + Send + 'static {
        let now = Instant::now();
        if self.last_invoked.is_none() {
            self.signal_sender.send(Signal::Execute).unwrap();
        } else {
            if now.duration_since(self.last_invoked.unwrap()) > self.timeout {
                self.signal_sender.send(Signal::Execute).unwrap();
            } else {
                self.debounce_sender.send(1).unwrap();
            }
        }
        self.last_invoked = Some(now);
    }

    pub fn terminate(self) {
        self.signal_sender.send(Signal::Terminate).unwrap();
        if let Some(thread) = self.thread {
            let result = thread.join();
            match result {
                Ok(_) => println!("Debouncer thread joined."),
                Err(_) => println!("Debouncer thread could not join.")
            }
        }
    }
}

fn main() {
    let mut debouncer = Debouncer::new(Duration::from_millis(1000));
    for _ in 0..5 {
        thread::sleep(Duration::from_millis(500));
        debouncer.execute(|| {
            println!("Hello world!");
        });
    }
    debouncer.terminate();
}

As you can see, this is not complete. The closure that's passed into the execute API is not actually called.

The pattern I think I need to implement is something like this:

  1. The debouncer needs to have a "task" property in its struct that's something like a Arc<Mutex<T>>. This is essentially the closure.
  2. On calls to execute, the task property should be updated.
  3. Meanwhile, the spawned thread should have a reference to this same property. When it's ready to execute the code, it will invoke t().

The key thing is that this task property needs to be mutable, as it will change each time the execute function is called.

Is there a more idiomatic approach?

Why not send the closure as a message in the channel? You are more or less implementing a kind of actor.

2 Likes

Hello Alice, many thanks for the response!

This is what I originally considered doing. I did not do so mainly to learn. Let me clarify – I want to learn if what I'm trying to do is feasible by "sharing memory" as opposed to sending a message. i.e. does Rust allow this without using unsafe and being back in essentially what feels like C++ land.

But if I was to use the message channel, then conceivably whenever I call self.debounce_sender.send(1) in the execute method, I could (on the next line) send the closure over the message channel.

And within my spawned thread, whenever debounce_receiver.recv_timeout doesn't time out (meaning self.debounce_sender.send(1) was called), in the Ok match statement I could receive the updated closure to call after the next loop iteration.

This is likely more idiomatic than sharing.

Actually, after reading and re-reading about a bunch of traits, including FnOnce, FnMut, Fn, Send, and Sync, I was able to write some logic to see how the closure could exist as a property in a struct and get updated by one or more threads while both threads are able to execute the closure.

use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::thread;

type Func = Box<dyn FnMut() + Send + Sync>;

struct Container {
    pub func: Func
}

impl Container {
    pub fn new(func: Func) -> Self {
        Container {
            func
        }
    }
    pub fn execute(&mut self) {
        (self.func)();
    }
    pub fn update(&mut self, func: Func) {
        self.func = func;
    }
}

fn main() {
    let meow = || {
        println!("Meow!");
    };
    let woof = || {
        println!("Woof!");
    };

    let mut shared_container = Arc::new(Mutex::new(Container::new(Box::new(meow))));
    let mut shared_container_clone = Arc::clone(&shared_container);

    let handle = thread::spawn(move || {
        for i in 0..5 {
            thread::sleep(Duration::from_millis(500));
            let mut container = shared_container.lock().unwrap();
            println!("[Iteration {}]", i);
            container.execute();
        }
    });

    thread::sleep(Duration::from_secs(2));
    {
        let mut container = shared_container_clone.lock().unwrap();
        container.update(Box::new(woof));
        container.execute();
    }
    handle.join().unwrap();
}

Thank you for the link on the actor approach. I will give that a read.

Generally I find it more readable to put the Arc/Mutex inside a struct:

use std::sync::{Arc, Mutex};

type Func = Box<dyn FnMut() + Send + Sync>;

#[derive(Clone)]
pub struct Container {
    pub func: Arc<Mutex<Func>>,
}

impl Container {
    pub fn new(func: Func) -> Self {
        Container {
            func: Arc::new(Mutex::new(func)),
        }
    }
    pub fn execute(&self) {
        let mut lock = self.func.lock().unwrap();
        (lock)();
    }
    pub fn update(&self, func: Func) {
        let mut lock = self.func.lock().unwrap();
        *lock = func;
    }
}

The code that uses it becomes a lot simpler this way:

fn main() {
    let meow = || {
        println!("Meow!");
    };
    let woof = || {
        println!("Woof!");
    };

    let mut shared_container = Container::new(Box::new(meow));
    let mut shared_container_clone = shared_container.clone();

    let handle = thread::spawn(move || {
        for i in 0..5 {
            thread::sleep(Duration::from_millis(500));
            println!("[Iteration {}]", i);
            shared_container_clone.execute();
        }
    });

    thread::sleep(Duration::from_secs(2));
    shared_container.update(Box::new(woof));
    shared_container.execute();
    handle.join().unwrap();
}
3 Likes

Ah! I agree this is a cleaner approach. Can you help me understand #[derive(Clone)]?

I see that using the derive attribute, we're getting some basic implementation for the clone trait. I found these pages in the documentation:

But from those pages, it's not clear how this default implementation works. I also printed this:

println!("{:p} {:p}", &shared_container, &shared_container_clone);
...
-----------------------------
0x7ffeefbff210 0x7ffeefbff218

I understood this as the shared_container and shared_container_clone are indeed two different objects.

  1. Is it that the clone implementation performed a deep clone operation of some kind, whereby each field in Container (in this case just func) is also cloned?
  2. If Container had some field with another custom type, that would be cloned too (recursively)?
  3. And if [3] is true, does it automatically detect that because func is of type Arc<Mutex> that it should clone func by calling Arc::clone(&T)?

Regarding question [3] – from the docs, I understood that on Arc, I have to call Arc::clone(&T). Then I'm free to transfer ownership of the object that's returned (to another thread). But that seems to have happened here automagically.

The code generated by #[derive(Clone)] will just call clone on the fields in the struct. So since the only field is an Arc, it will call the Arc::clone method, which returns a new Arc to the same shared object.

The addresses are different because the references are pointing at the Arc objects, not the shared value inside the Arc.

The Clone implementation will usually do a deep clone, but it doesn't for an Arc because they have a custom Clone impl that produces a new shared handle to the same shared object.

Yes, it calls clone on each field. However, it would not call clone on things inside the Arc.

The clone is indeed necessary to move it to another thread. You don't have to clone it with Arc::clone(&t) or Container::clone(&t). Calling .clone() would also work in both cases and is equivalent.