Abstractly, I'm trying to merge a vector of streams into a single stream. I found the merge_streams crate for that. But I'm unsure how to handle the output type, Merge<VeryLongCombinatorType>
. Ideally I'd like to take that stream and turn it into a custom struct, MessageStream
, where MessageStream: Stream
.
Here's an example:
use futures_lite::{future::block_on, prelude::*, stream};
use merge_streams::{IntoStream, MergeStreams};
fn main() {
block_on(async {
let a = stream::once(1);
let b = stream::once(2);
let c = stream::once(3);
let mut s = vec![a, b, c].merge().into_stream();
// Nonononono
let mut s: MyStream = vec![a, b, c].merge().into_stream();
})
}
struct MyStream;
impl Stream for MyStream {
type Item = i32;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
todo!()
}
}
I'm not actually sure how I'd fill out the body of poll_next
either. I've done some poking around, documentation on Streams seems sparse or not suited to this use case.
The actual use case, for the curious, is to query a vector of HTTP endpoints for some streamed responses, then to combine those streams into a single (unordered) stream, which will be continuously polled until the end of a protocol. Eg, a snippet:
let message_streams: Vec<MessageStream> = responses
.into_iter()
.map(|response| {
response.bytes_stream().filter_map(|bytes| async {
let b = bytes.unwrap();
let is_crap = &*b == b":\n" || &*b == b"\n";
if !is_crap {
// Some(Box::new(SigningMessage::try_from(b).unwrap()))
Some(SigningMessage::try_from(b).unwrap())
} else {
None
}
})
})
.collect(); // NOPE
let merged = message_streams.merge().into_stream();
self.rx_stream = Some(merged);
Ok(())
// Error:
// value of type `Vec<MessageStream>` cannot be built from `std::iter::Iterator<Item=futures::stream::FilterMap<impl Stream<Item = Result<rocket::http::hyper::body::Bytes, reqwest::Error>>, impl futures::Future<Output = std::option::Option<SigningMessage>>`
I'd like to know how to coerce these stream types into my custom stream struct, or at least how to handle those opaque types better from the above error.