Stream demultiplexing / routing

I'm running into a bit of trouble with stream handling. I'm able to merge a number of incoming streams using the try_stream! macro from async_stream and produce a single output stream. Now what I need to do is split or route a single incoming stream of items to several output streams, where each item only goes to one output. I can't quite figure it out. This is what I have so far:

/// fans out items from a single stream to a number of other streams with non-overlapping key ranges
struct StreamRouter {
    /// critical assumption that the ranges do not overlap. Stored key is the non-inclusive upper bound of the range
    routes: BTreeMap<Key, tokio::sync::mpsc::Sender<Result<Item>>>,
}

impl StreamRouter {
    fn stream_range(&mut self, upper_bound: Key) -> impl Stream<Item = Result<Item>> {
        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
        self.routes.insert(upper_bound, tx);
        try_stream! {
            while let Some(value) = rx.recv().await {
                yield value?;
            }
        }
    }

    async fn route(&self, stream: impl Stream<Item = Result<Item>>) -> Result<()> {
        tokio::pin!(stream);
        while let Some(result) = stream.next().await {
            let item = result?;
            let (_, tx) = self.routes.range(item.key.clone()..).next().unwrap();
            tx.send(Ok(item));
        }
        Ok(())
    }
}

To use this, I'll create a StreamRouter struct, call stream_range for each Key range I want to route, and that will return a Stream. Once it's all set up, I'll call stream(input_stream) which will return when the input stream runs out of items.

I think this will work for my use case, but it feels like an abuse of mpsc channels. What's the "right" way to solve this problem?

To quote the documentation of tokio's mpsc channel:

This channel is also suitable for the single-producer single-consumer use-case. (Unless you only need to send one message, in which case you should use the oneshot channel.)

This describes your pattern, a single producer (your StreamRouter) and a single consumer (the receiving end of the channel you route your message on). So I wouldn't worry too much about using channels for the use-case you described. Also, I believe using tokio's built-in types will give you the best performance (unless for very specific and niche use-cases, which yours isn't) and the lowest risk of implementing avoidable runtime errors such as deadlocks. So I'd think twice before trying to implement my own message passing.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.