How do I make an async stream to be fanned-out in an unbounded?

I need to fork a stream which is tagged with its name and the content.

There are multiple subscribers and have only one publisher. So basically, I need a SPMC (single async stream and then multiplex to multiple substreams). In other word, multicasting.

Each worker read from the stream in parallel and ignore the message that doesn't equal its tagged name, so it only processes the item that shares tagged name. Multiple workers can have the same tagged name as well.

I cannot use bounded broadcast channel because the upstream channel is unbounded. This means I cannot predict the backpressure but I cannot drop the messages to the substreams either.

I tried using Flume, Loole, Kanal, but calling them MPMC is a misnomer. They only guarantee that exactly one receiver to receive the message so it does not push to all substreams. In other word, they are not broadcast channel.

I tried barrage - Rust (docs.rs), broadcaster, circulate - Rust (github.com) and they are almost spot on except the former two have memory leak problem and the last one requires Serde.

I'm looking for a more lightweight solution like those that won't create a huge memory leak over time and something that's efficient and tiny.

An unbounded broadcast channel implies potentially excessive memory usage, and quite quickly at that. There are also performance implications because of dynamic queue resizing while processing messages. For these reasons, I don't think this kind of queue is commonly implemented.

For example, Tokio provides a bounded broadcast queue. You can see the crate documentation's explanation of how queue overflow is managed. Given that the bounded capacity can be set to usize::MAX / 2, which is about 2 billion messages on a 32-bit system (2^63 on 64-bit), it should be enough for most purposes.

If a very minimal unbounded queue is what you're after, you could also consider rolling a simple solution yourself.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.