Stream of Future<T> to stream of T

I've skimmed through StreamExt in futures_util::stream - Rust but not found it.

Let s be a stream of Future<T>

Is there some (builtin) task we can tokio::spawn to get a stream of T ?

use futures::future::{Future, FutureExt};
use futures::stream::{Stream, StreamExt};

pub fn flatten_future_stream<S, F, T>(s: S) -> impl Stream<Item = T>
where
    S: Stream<Item = F>,
    F: Future<Output = T>,
{
    s.flat_map(|f| f.into_stream())
}

There's no tokio::spawn needed. Stream transforms almost always need no tokio::spawn.

edit: @nerditation 's solution is much better.

3 Likes

There’s also some nuances as to whether or not you want the futures to be executed serially, or partially or fully concurrently. I don't remember the Stream API off the top of my head, but there’s different methods for such things.

also check then

let flat_stream = stream.then(|x|x);
6 Likes

If you want concurrency, the buffered and buffer_unordered methods provide that - they poll up to N futures concurrently, giving you a stream of values.

So @nerditation's then solution can be written with these methods as:

let flat_stream = stream.buffered(1);

which has at most 1 future being polled concurrently, If you want more concurrency, increase the 1 to a larger number; if ordering doesn't matter, use buffer_unordered instead of buffered to "sort" the outputs by first future to complete.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.