An equivalent of `mergeAll` from Rx observables for streams?

This post is basically a repost of an issue from 2017: An equivalent of `mergeAll` from Rx observables for streams? · Issue #640 · rust-lang/futures-rs · GitHub

In Rx, there's a method on observables called mergeAll that transforms a stream of streams into a stream that yields results from any of the underlying streams as they are available. The flatten method seems close, but it imposes ordering such that results of a given child stream only become available when the streams before it are completed.

I'm looking to build a multiplexed request handler that transforms a stream of incoming requests to a stream of responses to all of those requests and writes them to a sink. The only way I have found so far is to spawn a new future for each request in a for_each callback that writes the stream of responses to an unsync channel, then read from the channel outside of the for_each . It feels like it could be more elegant. Am I missing something.

Given the current futures library, I guess what I'd be looking for would be called buffered_flatten() and/or buffered_flatten_unordered(). Those don't exist, but is there a pattern I can implement to allow work to begin on later streams before the earlier streams complete?

Tokio provides a StreamMap. Additionally, building a mergeAll on top of FuturesUnordered seems reasonably easy.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.