I have a vector of N Streams producing items of the same type T. Is there an elegant way to zip them into a Stream<Item=Vec<T>>
?
I don't think there's a particularly elegant, built-in way. I couldn't find anything related in itertools, either. So here's one possible implementation:
struct Iter<T> {
iters: Vec<T>,
}
impl<T: Iterator> Iterator for Iter<T> {
type Item = Vec<T::Item>;
fn next(&mut self) -> Option<Self::Item> {
let v: Vec<_> = self
.iters
.iter_mut()
.filter_map(Iterator::next)
.collect();
if v.len() == self.iters.len() {
Some(v)
} else {
None
}
}
}
fn zip_n<O, T>(iters: O) -> impl Iterator<Item=Vec<T::Item>>
where
O: IntoIterator<Item=T>,
T: IntoIterator,
{
Iter {
iters: iters.into_iter().map(IntoIterator::into_iter).collect(),
}
}
fn main() {
let i = vec![
String::from("a"),
String::from("b"),
];
let j = vec![
String::from("x"),
String::from("y"),
String::from("z"),
];
let k = (0..=2).map(|x| x.to_string()).collect::<Vec<_>>();
let iters = [i, j, k];
for v in zip_n(&iters) {
println!("{:?}", v);
}
}
If you have heerogeneous iterator types, you can cast each of them to a &mut dyn Iterator<Item=T>
before passing them to zip_n()
.
Couldn’t find any existing solutions for iterators either. OP might actually be talking about Stream
from futures
. But the solution there can be similar. Note that the implementation can be simplified; this is what I came up with for iterators, look at the .next
function.
pub fn zip_many<I>(iters: I) -> ZipMany<<I::Item as IntoIterator>::IntoIter>
where
I: IntoIterator,
I::Item: IntoIterator,
{
let iters = iters.into_iter().map(I::Item::into_iter).collect();
ZipMany { iters }
}
pub struct ZipMany<I> {
iters: Vec<I>,
}
impl<I: Iterator> Iterator for ZipMany<I> {
type Item = Vec<I::Item>;
fn next(&mut self) -> Option<Self::Item> {
self.iters.iter_mut().map(I::next).collect()
}
}
I’m working on an analogue for futures::stream::Stream
.
Oh, nice, I didn't realize there was a correctly-working FromIterator
impl for Option<Container<T>>
.
I’m having trouble finding (or creating) a pinned collection. Judging by its Unpin
implementation, one ought to (maybe?) be able to get a Pin<&mut T>
from a Pin<&mut [T]>
and an index. But I can’t find out how.
Before I move on with this implementation, @pkolaczk, you should clarify whether you are actually referring to futures::stream::Stream
here. If yes, what does your input look like? A vector of Box<dyn Stream<Item=T>>
s or a vector of lots of instances of a some other type S: Stream
?
Yes, unfortunately I'm referring to async Streams.
At the moment I have only a method that creates a stream and returns it as Receiver<T>
(the actual producer is on the other side of the channel). The plan is to call it many times and do a zip on them. I haven't tried putting them in Vec
yet, but probably Vec<Receiver<T>>
would work - all streams are of the same statically known type, so I guess I don't need Box nor dyn.
This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.