Single consumer async channel with lagging

I am looking for a single (or multi) producer single consumer channel with async receive that drops old messages if the receiver is too slow (like tokio::sync::broadcast but single consumer).
The problem I'm trying to solve is the following: I have a (ideally many) main thread that sends messages (e.g. state updates) and a worker thread that does something very time consuming with them (e.g. saves them somewhere).

It does not matter if some of the messages are dropped, I only need eventual consistency. A lagging mpsc channel with any capacity would be great, but I'm fine with capacity 1 (Is that even a channel any more?).

Is there any library out there that does that? My honorable mentions:

  • tokio::sync::broadcast would fit, but my messages can't be cloned and I don't want to put them into an Arc because mutable access is required and I need to be able to move out of it. Arc<Mutex<Option<T>>> works but doesn't seem right.
  • tokio::sync::mpsc is what I am currently using, but I send an atomic bool with every message to tell the worker that this message can be skipped which seems really ugly to me.

How about a watch channel?

As far as I understood the docs it neither gives mutable access nor allows moving out of it. This has exactly the same problems as broadcast.

You could wrap it in the atomic-take container to move it out. Besides that, you could define a type like this:

struct LagChan<T> {
    msg: std::sync::Mutex<Option<T>>,
    on_send: tokio::sync::Notify,

impl<T> LagChan<T> {
    fn send(&self, msg: T) {
        let mut lock = self.msg.lock().unwrap();
        *lock = Some(msg);
    async fn recv(&self) -> T {
        loop {
            if let Some(msg) = self.msg.lock().unwrap().take() {
                return msg;