How to combine two futures::Stream into one?

My use case is to be able to read from the stdin as a stream as I type. This streamis then combined in real-time with another stream which emits events at a predifined time interval. The desired result is the combined stream with the combined data. What is the best approach for this? I need to be cross-platform so that I can run this mainly in MacOS, Linux or Windows. I also desire executor independance (async-std vs tokio) if possible.

This is for a console application. So I've been checking out crossterm and tui-rs.

I've also checked https://github.com/Pauan/rust-signals which seems promising, but I am unsure due to my inexperience with Rust if an external crate is necessary for my needs.

What's the recommended solution for that?

I guess the main question is really, what Stream type do you want to use: if you use ::tokio's or ::async-std's, they both provide a .merge() method on them with a very similar implementation, which, in and on itself, is executor agnostic:

Now, for more generality, if you really want to be "async framework"-agnostic, then one good candidate for this could be ::futures's Stream, but it does not seem to offer any merging whatsoever, although you should be able to easily add that functionality on top of it inspired by the two implementations mentioned above.

Don't be afraid to pull external crates, it's something very common in the Rust ecosystem :wink:


A very "simple" albeit a bit inefficient solution using only the standard library is, since you have exactly two sources of input, to spawn two "listener threads" for each stream, so that each try to read some bytes out of your Readables in a blocking fashion, and when they succeed, communicate / send those bytes through a mpsc::channel, and then have your main thread poll / query the receiver end of the channel.

The futures crate provides a runtime agnostic combinator called select, which does exactly this. If you don't need an actual stream, but just need a loop that pulls data out of both, you can also use the select! macro.

1 Like

Thank you very much for your thorough response!

I was going for the futures::Stream solution but I am unsure how to convert the input from stdin into a Stream. Is there solution for that? i was looking at async-std's implementation but it seems to be as a future and not a Stream of input characters.

Regarding rust-signals, do you have any experience with it, is it overkill or just right?

A very "simple" albeit a bit inefficient solution using only the standard library is, since you have exactly two sources of input, to spawn two "listener threads" for each stream

Yes, was also thinking about that though this is the moment for me to learn about Rust coroutines/async/await and in the process having a more efficient solution :grinning:

What is the difference difference between the two? From what I can see, one is for Stream and the other for Future?

The first one requires you to have two streams, and combines them into an actual stream object. The other one allows you to wait for things to happen on arbitrary event sources, including non-streams, however it doesn't give you a stream object. You probably want the latter in this case.

2 Likes