I'm running into a bit of trouble with stream handling. I'm able to merge a number of incoming streams using the try_stream!
macro from async_stream
and produce a single output stream. Now what I need to do is split or route a single incoming stream of items to several output streams, where each item only goes to one output. I can't quite figure it out. This is what I have so far:
/// fans out items from a single stream to a number of other streams with non-overlapping key ranges
struct StreamRouter {
/// critical assumption that the ranges do not overlap. Stored key is the non-inclusive upper bound of the range
routes: BTreeMap<Key, tokio::sync::mpsc::Sender<Result<Item>>>,
}
impl StreamRouter {
fn stream_range(&mut self, upper_bound: Key) -> impl Stream<Item = Result<Item>> {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
self.routes.insert(upper_bound, tx);
try_stream! {
while let Some(value) = rx.recv().await {
yield value?;
}
}
}
async fn route(&self, stream: impl Stream<Item = Result<Item>>) -> Result<()> {
tokio::pin!(stream);
while let Some(result) = stream.next().await {
let item = result?;
let (_, tx) = self.routes.range(item.key.clone()..).next().unwrap();
tx.send(Ok(item));
}
Ok(())
}
}
To use this, I'll create a StreamRouter
struct, call stream_range
for each Key range I want to route, and that will return a Stream. Once it's all set up, I'll call stream(input_stream)
which will return when the input stream runs out of items.
I think this will work for my use case, but it feels like an abuse of mpsc channels. What's the "right" way to solve this problem?