Async - what machinery would you use for this?

Hi all, my use case is that I have a consumer which wants to be notified when there is something to do. I have a producer which produces work to do. All simple so far. Work to do is indicated by a sequential ticket id (which is an incrementing usize: 0, 1, 2, 3, 4 ...)

HOWEVER:

  • tickets must be consumed in sequence not parallel
  • the producer doesn't produce the work, it only signifies that there is work to do by announcing the ticket id
  • it generates this much quicker than the consumer can consume, so by the time consumer has finished consuming 0 the ticket id might be 10
  • the consumer "swallows" up all the tickets since the last time, so if whilst it is consuming ticket 0 another 10 tickets arrive, it will consume/swallow tickets 1..=15 (15 in case any new tickets arrive ;-))
  • the consumer should pounce as soon as there is work to do

I know of the following ways I could do this:

message queue

have consumer subscribe to a message queue<usize> which the producer populates. The consumer will then while loop on receiving, thus being notified "immediately" once a new ticket comes in. However, if whilst processing ticket 0 another 10 arrive, it is going to consume ticket 1, and then ticket 2 etc.

(This is fine in reality because it will actually consume ticket 1..=1 + (e.g. 1..=1+ 10) and then ignore any tickets it has already processed.)

polling some Arc<Mutex<usize>>

the consumer will "if ticket.clone().lock()? < last_seen { ... let last_seen = do work() ... }; sleep_some()" and the producer will correspondingly update the ticket as new work comes in.

This is probably the simplest but spin locking never feels good.

What I want to do is have a queue of 1, where new pushes to the queue overwrite the contents rather than block. Something like an ergonomic reusable future maybe, but I can't find anything. Either they are:

  • bounded and block so a queue of 1 will block
  • unbounded

both will serialise the tickets to the consumer, which isn't what I want.

Is there such a beast? A single blocking resource which can be overwritten with a new value?

Thanks!

I'm not sure I understand this requirement. Are you saying that the consumer should:

  • process only the last ticket arrived (ignoring any previous one)

or

  • process all the tickets that arrived in the meantime, in bulk

?

1 Like

the second, but only up to a particular batch size. I'm not concerned with the batching itself, it's more the "do I poll or do I block when the consumer is expected to race ahead and I'm only interested in the latest"

I think you can avoid spinlocking for Arc<Mutex<usize>> by using tokio::sync::Notify, with the producer notifying the consumer whenever the ticket is updated.

What I want to do is have a queue of 1, where new push es to the queue overwrite the contents rather than block.

This sounds to me like tokio::sync::watch.

3 Likes

thanks both @SkiFire13 and @jwodder. I knew there had to be something, I just couldn't find it! Hope you are all having a great Christmas break!