Working with Streams: Combine a vector of Streams into a single Stream of a custom type

Abstractly, I'm trying to merge a vector of streams into a single stream. I found the merge_streams crate for that. But I'm unsure how to handle the output type, Merge<VeryLongCombinatorType>. Ideally I'd like to take that stream and turn it into a custom struct, MessageStream, where MessageStream: Stream.

Here's an example:

use futures_lite::{future::block_on, prelude::*, stream};
use merge_streams::{IntoStream, MergeStreams};

fn main() {
  block_on(async {
    let a = stream::once(1);
    let b = stream::once(2);
    let c = stream::once(3);
    let mut s = vec![a, b, c].merge().into_stream();
    // Nonononono 
    let mut s: MyStream = vec![a, b, c].merge().into_stream();
  })
}

struct MyStream;
impl Stream for MyStream {
  type Item = i32;

  fn poll_next(
    self: std::pin::Pin<&mut Self>,
    cx: &mut std::task::Context<'_>,
  ) -> std::task::Poll<Option<Self::Item>> {
    todo!()
  }
}

I'm not actually sure how I'd fill out the body of poll_next either. I've done some poking around, documentation on Streams seems sparse or not suited to this use case.

The actual use case, for the curious, is to query a vector of HTTP endpoints for some streamed responses, then to combine those streams into a single (unordered) stream, which will be continuously polled until the end of a protocol. Eg, a snippet:

		let message_streams: Vec<MessageStream> = responses
			.into_iter()
			.map(|response| {
				response.bytes_stream().filter_map(|bytes| async {
					let b = bytes.unwrap();
					let is_crap = &*b == b":\n" || &*b == b"\n";
					if !is_crap {
						// Some(Box::new(SigningMessage::try_from(b).unwrap()))
						Some(SigningMessage::try_from(b).unwrap())
					} else {
						None
					}
				})
			})
			.collect(); // NOPE
		let merged = message_streams.merge().into_stream();
		self.rx_stream = Some(merged);
		Ok(())
// Error:
// value of type `Vec<MessageStream>` cannot be built from `std::iter::Iterator<Item=futures::stream::FilterMap<impl Stream<Item = Result<rocket::http::hyper::body::Bytes, reqwest::Error>>, impl futures::Future<Output = std::option::Option<SigningMessage>>`

I'd like to know how to coerce these stream types into my custom stream struct, or at least how to handle those opaque types better from the above error.

It sounds like you're looking for futures::stream::SelectAll.

Yes, that would work for merging the streams, as would the solution above, though perhaps futures is more stable than a PoC crate :sweat_smile:. Maybe the title was misleading, as my issue is more

  • "what do I do with the opaque return type that I would like to turn into something convenient", rather than
  • "how do I merge streams with SelectAll or Merge or whatever else."

Eg, with an opaque type like this, (described above):
std::iter::Iterator<Item=futures::stream::FilterMap<impl Stream<Item = Result<rocket::http::hyper::body::Bytes, reqwest::Error>>, impl futures::Future<Output = std::option::Option<SigningMessage>>.

My first thought was that I might be able to turn that type into a custom Stream type, but since that seems difficult, my second thought was to use generics; but that's also somewhat ugly, since the generic will decorate every instance of my struct. So I'm here.

You can use trait objects. The futures crate defines a BoxStream type alias that's easy to use.

let stream: BoxStream<'static, i32> = Box::pin(your_stream_goes_here);
1 Like

oh hell yeah, that's neat. Thanks!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.