How to zip N streams, where N known at runtime only?

I have a vector of N Streams producing items of the same type T. Is there an elegant way to zip them into a Stream<Item=Vec<T>> ?

I don't think there's a particularly elegant, built-in way. I couldn't find anything related in itertools, either. So here's one possible implementation:

struct Iter<T> {
    iters: Vec<T>,
}
    
impl<T: Iterator> Iterator for Iter<T> {
    type Item = Vec<T::Item>;
        
    fn next(&mut self) -> Option<Self::Item> {
        let v: Vec<_> = self
            .iters
            .iter_mut()
            .filter_map(Iterator::next)
            .collect();

        if v.len() == self.iters.len() {
            Some(v)
        } else {
            None
        }
    }
}

fn zip_n<O, T>(iters: O) -> impl Iterator<Item=Vec<T::Item>> 
    where
        O: IntoIterator<Item=T>,
        T: IntoIterator,
{
    Iter { 
        iters: iters.into_iter().map(IntoIterator::into_iter).collect(),
    }
}

fn main() {
    let i = vec![
        String::from("a"),
        String::from("b"),
    ];
    
    let j = vec![
        String::from("x"),
        String::from("y"),
        String::from("z"),
    ];
    
    let k = (0..=2).map(|x| x.to_string()).collect::<Vec<_>>();
    
    let iters = [i, j, k];
    
    for v in zip_n(&iters) {
        println!("{:?}", v);
    }
}

If you have heerogeneous iterator types, you can cast each of them to a &mut dyn Iterator<Item=T> before passing them to zip_n().

Couldn’t find any existing solutions for iterators either. OP might actually be talking about Stream from futures. But the solution there can be similar. Note that the implementation can be simplified; this is what I came up with for iterators, look at the .next function.

pub fn zip_many<I>(iters: I) -> ZipMany<<I::Item as IntoIterator>::IntoIter>
where
    I: IntoIterator,
    I::Item: IntoIterator,
{
    let iters = iters.into_iter().map(I::Item::into_iter).collect();
    ZipMany { iters }
}

pub struct ZipMany<I> {
    iters: Vec<I>,
}

impl<I: Iterator> Iterator for ZipMany<I> {
    type Item = Vec<I::Item>;
    fn next(&mut self) -> Option<Self::Item> {
        self.iters.iter_mut().map(I::next).collect()
    }
}

I’m working on an analogue for futures::stream::Stream.

1 Like

Oh, nice, I didn't realize there was a correctly-working FromIterator impl for Option<Container<T>>.

I’m having trouble finding (or creating) a pinned collection. Judging by its Unpin implementation, one ought to (maybe?) be able to get a Pin<&mut T> from a Pin<&mut [T]> and an index. But I can’t find out how.

Before I move on with this implementation, @pkolaczk, you should clarify whether you are actually referring to futures::stream::Stream here. If yes, what does your input look like? A vector of Box<dyn Stream<Item=T>>s or a vector of lots of instances of a some other type S: Stream?

Yes, unfortunately I'm referring to async Streams.
At the moment I have only a method that creates a stream and returns it as Receiver<T> (the actual producer is on the other side of the channel). The plan is to call it many times and do a zip on them. I haven't tried putting them in Vec yet, but probably Vec<Receiver<T>> would work - all streams are of the same statically known type, so I guess I don't need Box nor dyn.

Here is one way to do it: playground.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.