Single consumer async channel with lagging

I am looking for a single (or multi) producer single consumer channel with async receive that drops old messages if the receiver is too slow (like tokio::sync::broadcast but single consumer).
The problem I'm trying to solve is the following: I have a (ideally many) main thread that sends messages (e.g. state updates) and a worker thread that does something very time consuming with them (e.g. saves them somewhere).

It does not matter if some of the messages are dropped, I only need eventual consistency. A lagging mpsc channel with any capacity would be great, but I'm fine with capacity 1 (Is that even a channel any more?).

Is there any library out there that does that? My honorable mentions:

  • tokio::sync::broadcast would fit, but my messages can't be cloned and I don't want to put them into an Arc because mutable access is required and I need to be able to move out of it. Arc<Mutex<Option<T>>> works but doesn't seem right.
  • tokio::sync::mpsc is what I am currently using, but I send an atomic bool with every message to tell the worker that this message can be skipped which seems really ugly to me.

How about a watch channel?

As far as I understood the docs it neither gives mutable access nor allows moving out of it. This has exactly the same problems as broadcast.

You could wrap it in the atomic-take container to move it out. Besides that, you could define a type like this:

struct LagChan<T> {
    msg: std::sync::Mutex<Option<T>>,
    on_send: tokio::sync::Notify,
}

impl<T> LagChan<T> {
    fn send(&self, msg: T) {
        let mut lock = self.msg.lock().unwrap();
        *lock = Some(msg);
        drop(lock);
        self.on_send.notify_one();
    }
    
    async fn recv(&self) -> T {
        loop {
            if let Some(msg) = self.msg.lock().unwrap().take() {
                return msg;
            }
            self.on_send.notified().await;
        }
    }
}
1 Like

I had to add code to allow closing the channel but other than that, the LagChan definitely works, thank you!

Besides that, why is there no lagging single consumer channel (when there is a multi consumer one)?

There are lots of different channels you could add.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.