I've skimmed through StreamExt in futures_util::stream - Rust but not found it.
Let s
be a stream of Future<T>
Is there some (builtin) task we can tokio::spawn to get a stream of T ?
I've skimmed through StreamExt in futures_util::stream - Rust but not found it.
Let s
be a stream of Future<T>
Is there some (builtin) task we can tokio::spawn to get a stream of T ?
use futures::future::{Future, FutureExt};
use futures::stream::{Stream, StreamExt};
pub fn flatten_future_stream<S, F, T>(s: S) -> impl Stream<Item = T>
where
S: Stream<Item = F>,
F: Future<Output = T>,
{
s.flat_map(|f| f.into_stream())
}
There's no tokio::spawn
needed. Stream transforms almost always need no tokio::spawn
.
edit: @nerditation 's solution is much better.
There’s also some nuances as to whether or not you want the futures to be executed serially, or partially or fully concurrently. I don't remember the Stream
API off the top of my head, but there’s different methods for such things.
If you want concurrency, the buffered
and buffer_unordered
methods provide that - they poll up to N futures concurrently, giving you a stream of values.
So @nerditation's then
solution can be written with these methods as:
let flat_stream = stream.buffered(1);
which has at most 1 future being polled concurrently, If you want more concurrency, increase the 1
to a larger number; if ordering doesn't matter, use buffer_unordered
instead of buffered
to "sort" the outputs by first future to complete.