Futures: cloning / forking streams

Hi, newbie here!

I've been toying around with futures, just to get a basic understanding of it, but I can't really figure out how to "clone" a stream or how to reuse it for multiple separate pipelines.

I'd like to do something similar to this graph:

                                             CORE/ENGINE
[events]---------.----------(pipeline 1)-------|  P  |
                 |----------(pipeline 2)-------|  O  |
                 |----------(pipeline 3)-------|  L  |
                 \----------(pipeline 4)-------|  L  |

where I can filter the event stream and do the processing separately, or at least describe it. Currently all I was able to do is use map() to keep the transformed and the original value in the pipeline, but it gets really messy once I need to keep track of many transformations.

Is there a way to broadcast the polled value to multiple objects? I was looking at Shared but as far as I understand it's more like an async switching mechanism between pipelines.

// now
let transforms = Events::new()
    .map(|e| ( e,  pipe1(e),  pipe2(e),  pipe3(e)) );

// idealy
let events = Events::new();
let pipe1 = events.clone()
    .filter(...).map(...);
let pipe2 = events.clone()
    .whatever(...);

Here's a gist on what I've tried, but I'm not sure if this is the correct approach...

Don't have too much to add, but you may find https://github.com/carllerche/futures-broadcast/blob/master/src/lib.rs interesting.

Also, I wonder if you can make use of Sink and Stream::forward. You'd write a custom sink that a stream is forwarding events to, and the sink would internally maintain multiple futures::sync::mpsc::Sender/Receiver pairs, one for each pipeline. I've not thought too hard about this, so it may be a half baked idea :slight_smile:.

1 Like

Ah, thanks! I've only checked crates.io. However it looks a bit overkill for my use case but perfect as a reference.

And, yes. I want to do something very similar to your idea, just couldn't figure out how to use Sinks yet.