I'm looking for a channel to pass messages. The channel should be:
- asynchronous (i.e. non-blocking)
- allow receivers to be added and removed at any time
- send each message to every receiver that is currently attached/listening
- provide back-pressure, i.e. if one receiver is slow, the sender should wait and be woken up later (through
async
)
I have looked at various existing channels, of which none seems to fit:
-
tokio::sync::broadcast
doesn't provide backpressure -
bus
isn't async and wastes CPU as of yet -
multiqueue::BroadcastSender
doesn't allow adding new receivers, which can only be done throughmultiqueue::BroadcastReceiver::add_stream
, but if I store aBroadcastReceiver
to later add more receivers (but don't use that receiver), the process will block forever (which is why the example drops the original receiver, but then I can't add more receivers easily)
My use case is radiorust::flow
which I would like to extend to provide backpressure.
I would also be interested to learn how I can implement such a channel myself (using atomic operations?), but I feel like it's not easy to do it properly, especially when tasks need to be woken up also.