Send/fork/clone a single stream into two sinks?

I have one impl Stream<Item=Cloneable> and would like to have two independent impl Stream<Item=Cloneable>s with this content.

How can I do that using std futures?

The tricky problems:

  • I'd like backpressure/finite amount of buffering. The source should be polled only as fast as both destinations can manage to read.
  • If one of the streams is dropped, it should not harm the other one.
  • If both streams are dropped, the source should be dropped as well.

I have a hunch that it will require one or more async mpsc and sinks, but I'm not sure exactly how to put these together.

The easiest is to just spawn a task that forwards messages, using bounded channels for backpressure.

let (mut send1, recv1) = channel(cap);
let (mut send2, recv2) = channel(cap);
tokio::spawn(async move {
    tokio::pin!(stream);
    while let Some(item) = stream.next().await {
        let item2 = item.clone();
        if send1.send(item2).await.is_err() {
            if send2.send(item).await.is_ok() {
                forward(stream, send2).await;
            }
            return;
        }
        if send2.send(item).await.is_err() {
            forward(stream, send1).await;
            return;
        }
    }
});

async fn forward<S: Stream>(mut stream: Pin<&mut S>, mut sink: Sender<S::Item>) {
    while let Some(item) = stream.next().await {
        if sink.send(item).await.is_err() {
            return;
        }
    }
}

playground

It's a bit more difficult if you need something that doesn't spawn (and thus becomes executor agnostic). I had a bit fun with that and came up with this.

2 Likes

I wrote a crate to solve exactly this problem (well, the more generalised pub/sub form of it). Something like this should do what you want:

use flo_stream::*;

let mut publisher = Publisher::new(1);
let output_stream1 = publisher.subscribe();
let output_stream2 = publisher.subscribe();
let when_done = publisher.send_all(input_stream);

// ...

when_done.await;

Flowbetween uses streams everywhere for sending events around and general communication between components, so I had a particular need for a straightforward API for doing this kind of thing.

2 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.