How to re-use an iterator chain without allocating a Vec?

Let say that I have those two pipelines of operations:

let a = my_collection
    .iter()
    .map(|x| some_expensive_computation())
    .filter(|x| some_condition(x))
    .map(|x| some_transformation(x))
    .max();

let b = my_collection
    .iter()
    .map(|x| some_expensive_computation())
    .map(|x| some_other_transformation(x))
    .filter(|x| some_other_condition(x))
    .count();

As you can see they both share a common part:

my_collection
    .iter()
    .map(|x| some_expensive_computation())

How can I factorize it, without doing the computation twice, nor allocate a Vec to store the intermediate results?


I know I can clone the iter, but this will do the computation twice:

let iter = my_collection
    .iter()
    .map(|x| some_expensive_computation());

let a = iter.clone()
    .filter(|x| some_condition(x))
    .map(|x| some_transformation(x))
    .max();

let b = iter
    .map(|x| some_other_transformation(x))
    .filter(|x| some_other_condition(x))
    .count();

Or I could store the intermediate results in a Vec.

let tmp: Vec<_> = my_collection
    .iter()
    .map(|x| some_expensive_computation())
    .collect();

let a = tmp.iter()
    .filter(|x| some_condition(x))
    .map(|x| some_transformation(x))
    .max();

let b = tmp.into_iter()
    .map(|x| some_other_transformation(x))
    .filter(|x| some_other_condition(x))
    .count();

But I would like to not store the intermediate result, nor do the computation twice. Something like:

let iter_a = |iter| {
    iter
        .filter(|x| some_condition(x))
        .map(|x| some_transformation(x))
        .max();
};

let iter_b = |iter| {
    iter
        .map(|x| some_other_transformation(x))
        .filter(|x| some_other_condition(x))
        .count();
};

let (a, b) = my_collection
    .iter()
    .map(|x| some_expensive_computation())
    .pipe_into((iter_a, iter_b));

I feel that there is a link with async, but since I never used it I may be totally wrong.

You can't.

An iterator represents a stream of items which are generated on-demand. If you want to consume that stream twice, you'll need to either a) do the item generation twice, or b) store the result of one iteration somewhere so it can be cheaply reused.

You've stated you want neither solution a, nor solution b, so it is not possible.

As an alternative, you can use the items multiple times while consuming it once by doing both operations on each item at the same time. That's a lot of words, but here's some code:

let items = my_collection
    .iter()
    .map(|x| some_expensive_computation());

let mut max = ...;
let mut count = 0;

for item in items {
  count += 1;
  max = std::cmp::max(max, item);
}
1 Like

You can't conveniently do it with iterators, but you can write a loop that runs each item through both operations in an alternating manner.

1 Like

If the items yielded by some_expensive_computation are Cloneable you could with itertools do something like this :

use itertools::Itertools;

let (iter1, iter2) = my_collection
    .iter()
    .map(|x| some_expensive_computation())
    .tee();
let a = iter1.filter(|x| some_condition(x))
    .map(|x| some_transformation(x))
    .max();
let b = iter2.map(|x| some_other_transformation(x))
    .filter(|x| some_other_condition(x))
    .count();

See the docs for tee

1 Like

Using tee like that would allocate a vector internally in which all of the elements would be stored.

2 Likes

Here is another idea using Rc:

    let my_collection = vec!["hello".to_string(), "world".to_string()];
    let mut iters = sind::<_, _, 2>(
        my_collection
            .into_iter()
            .map(|x| some_expensive_computation(x)),
    );
    dbg!(iter_a(&mut iters[0]));
    dbg!(iter_b(&mut iters[1]));
}

fn sind<I, T, const N: usize>(iter: I) -> [impl Iterator<Item = Rc<T>>; N]
where
    I: Iterator<Item = T>,
{
    struct S<I: Iterator> {
        iter: Rc<RefCell<I>>,
        sharedbuf: Rc<RefCell<Vec<Rc<<I as Iterator>::Item>>>>,
        idx: usize,
    }
    impl<I> Iterator for S<I>
    where
        I: Iterator,
    {
        type Item = Rc<I::Item>;

        fn next(&mut self) -> Option<Self::Item> {
            let mut sharedbuf = self.sharedbuf.borrow_mut();
            if !sharedbuf.is_empty() {
                if let Some(item) = sharedbuf.get(self.idx) {
                    self.idx += 1;
                    return Some(item.clone());
                }
            }
            self.idx += 1;

            let next = self.iter.borrow_mut().next()?;
            let next = Rc::new(next);

            sharedbuf.push(next.clone());
            Some(next.clone())
        }
    }
    let iter = Rc::new(RefCell::new(iter));
    let sharedbuf = Rc::new(RefCell::new(vec![]));

    [0; N].map(|_| S {
        iter: iter.clone(),
        sharedbuf: sharedbuf.clone(),
        idx: 0,
    })
}

I want to avoid the necessary vector to hold the elements: I can do it with threads:

fn a() {
    fn iter_a(iter: impl Iterator<Item = i32>) -> usize {
        iter.count()
    }
    fn iter_b(iter: impl Iterator<Item = i32>) -> Option<i32> {
        iter.max()
    }
    fn iter_c(iter: impl Iterator<Item = (i32, i32)> + Send + 'static) -> (usize, Option<i32>) {
        let (tx_a, rx_a) = mpsc::channel();
        let (tx_b, rx_b) = mpsc::channel();
        thread::spawn(move || {
            iter.for_each(|(a, b)| {
                tx_a.send(a).unwrap();
                tx_b.send(b).unwrap();
            });
        });
        struct I<T>(mpsc::Receiver<T>);
        impl<T> Iterator for I<T> {
            type Item = T;

            fn next(&mut self) -> Option<Self::Item> {
                self.0.recv().ok()
            }
        }
        (iter_a(I(rx_a)), iter_b(I(rx_b)))
    }
    let q = vec![1, 2, 5].into_iter().map(|i| (i.clone(), i));
    dbg!(iter_c(q));
}

Maybe this is the right track

What can be improved:

  • The channels have internal buffer, I can use an std rendezvous channel, but that still uses a Vec internally, maybe I can create a custom rendezvous channel with no Vec but only an Option
  • Is there a way to avoid the thread all together

(this part of the message should have been send 4 hours ago, it looks like I didn't pressed the reply button). Thanks all for your responses. I have something I mind that I need to test. It's basically tee, but where the iterators are advanced in lock steps.

It's quite similar to your idea using threads, except that I will run both iterators on a single thread. Therefore I just need a mecanism to switch the active task (from one iterator to the other) each time next() is called). If I understand corrected writting such state machine is possible with async. I unfortunately don't think that I will have time to experiment it this week-end, I really hope I will have some time on Monday.

And thanks again for all the suggestions.

1 Like

I tried this more, I think my conclusion is its not possible without threads , the problem lies that the current iterator functions are not generators aware, so something like count() will consume the iterator without giving a change for interleaving action

If count() for example was based on generator, the yield points could be used to orchestrate the state machine.

The solution with thread is still incorrect, because the iter_a will actually block, so the items of the iter_b will be accumulated in tx_b channel, and only sent to it once iter_a is done.

A correct state machine in this case needs 2 threads, for example:

    let a = thread::spawn(|| iter_a(I(rx_a)));
    let b = iter_b(I(rx_b));
    let a = a.join().unwrap();
    return (aa, bb)

And for stepping to be in lockstep as in original_iter.next() -> iter_a.next() -> iter_b.next() -> original_iter.next() , the channels needs to be a rendezvous channel:

let (tx, rx) = mpsc::sync_channel(0);

count uses fold internally, here is a fold that uses nightly generator that should make this possible:

#![feature(generators, generator_trait)]

use std::ops::{Generator, GeneratorState};
use std::pin::Pin;

fn main() {
    let a = vec![1, 2, 3];

    let mut sum_gen = a.clone().into_iter().fold_gen(0, |acc, a| acc + a);
    let mut count_gen = a.into_iter().fold_gen(0, |acc, _| acc + 1);
    loop {
        match Pin::new(&mut sum_gen).resume(()) {
            GeneratorState::Yielded(_) => {
                println!("sum step");
            }
            GeneratorState::Complete(sum) => {
                println!("sum: {}", sum);
                break;
            }
        }
        match Pin::new(&mut count_gen).resume(()) {
            GeneratorState::Yielded(_) => {
                println!("count step");
            }
            GeneratorState::Complete(count) => {
                println!("count: {}", count);
            }
        }
    }
}

trait Q: Iterator {
    fn fold_gen<B: 'static, F: 'static>(
        mut self,
        init: B,
        mut f: F,
    ) -> Box<dyn Generator<Yield = (), Return = B> + Unpin>
    where
        Self: Sized + 'static,
        F: FnMut(B, Self::Item) -> B,
    {
        let mut accum = init;
        Box::new(move || loop {
            match self.next() {
                Some(x) => {
                    accum = f(accum, x);
                    yield ();
                }
                None => return accum,
            }
        })
    }
}
impl<I: Iterator> Q for I {}

Edit: It turns out that its possible to use async as stable generator, just instead of returning the generator, we can return a custom struct that implements Future trait and it will also work the same.

I’m continuing to tweak my idea. I still need to document myself on how to use async and/or generators because it really looks like its what I need but I’m still unfamiliar with them. Otherwise I will have to fallback to threads. Here is the pseudo-rust that I should implement in the next days.

fn main() { 
    let (a, b) = my_collection
        .iter()

        // step 1: arbitrary computation
        .map(|x| some_expensive_computation())

        .pipe_into(( 
            |iter| { iter
                // step 2.1: arbitrary computation
                .filter(|x| some_condition(x))
                .map(|x| some_transformation(x))
                .max()
            },
            |iter| { iter
                // step 2.2: another set of arbitrary computations that
                // must be run in lockstep with step 2.1 in order to have 0(1) characteristics
                .map(|x| some_other_transformation(x))
                .filter(|x| some_other_condition(x))
                .count()
            }));

    // Do something with `a` (the result of step 1 + step 2.1)
    // and `b` (the result of step 1 + step 2.2).
}

// this is like `tee`, but with O(1) both in term of memory and computation
fn pipe_into<Item, T1, T2>(input: impl Iterator<Item=Item>, consumer: (impl Fn(impl Iterator<Item=Item>) -> T1, impl Fn(impl Iterator<Item=Item>) -> T2) -> (T1, T2)
    where <input as Iterator>::Item>: Clone 
{   
    // will be replaced by `input.next()` the first time `consumer.0` call `next()` on the iterator
    // that it receives
    let mut value: Option<Item> = None;

    // This will create an iterator for each consumer, and they will all
    // be consumed in lockstep

    // Then when all consumers are finished their value is returned.

    // Note: the following logic assume that each consumer will
    // consume exactly the same number of elements from `input`.
    // It should be possible to make it more general later.
    (consumer.0(
        std::iter::from_fn(|| {
            // wait for consumer.1 to consume the value (skipped the first time)
            todo!();

            value = iter.next();

            // notify consumer.1 that it can resume its execution
            todo!();

            value.clone()
            // Note: it would be possible to return `&value` instead
            // of `value` to remove the `clone`.
        })),
    consumer.1(
        std::iter::from_fn(|| -> <input as Iterator>::Item {
            // notify consumer.0 that it can resume its execution
            todo!();
            // then wait for consumer.0 to generate a new value
            todo!();

            value.clone();
            // Note: this can be optimized out by using
            // `std::mem::take(value)` since `value` its the last
            // read of the current value of `value`.
        })) 
    )   
}

I gave the async approach a go, and this should be fairly straightforward.. well more-or-less. I tried avoiding additional allocations and needed to work around a bit around limitations of HRTB closures that return futures, or bad closure type inference.

My current code doesn't properly handle the Context/Wakers yet; there's probably be a problem when the closure passed its |iter| argument e.g. into a separately spawned task; it should already work properly as long as the closure directly awaits the stream's elements in the same context... anyways, I won't continue working on this before tomorrow, here's what it currently looks like, also using stream extension traits from async_std for the main function (because they're most similar to the standard library's Iterator API):

#[allow(clippy::redundant_closure)]
fn main() {
    let my_collection = [(), (), ()];

    let (a, b) = futures::executor::block_on(
        stream::from_iter(my_collection.iter().map(|_x| some_expensive_computation()))
            .broadcast()
            .consume_with(
                |iter| {
                    iter.filter(|x| some_condition(x))
                        .map(|x| some_transformation(x))
                        .max()
                },
                |iter| {
                    iter.map(|x| some_other_transformation(x))
                        .filter(|x| some_other_condition(x))
                        .count()
                },
            ),
    );
    dbg!(a, b);
}

The .broadcast().consume_with(_, _) is two steps because the second method takes &mut self which allows the passed closures not to be generic over the lifetime. The code uses some shared struct on the stack for the different futures to communicate, and this data can be part of the struct returned by broadcast in order to lift the need for HRTBs while avoiding allocation, too.

It uses Cell, but could be changed to support Sync/Send. The naming "broadcast" is inspired by the fact that you could (probably) achieve the same kind of functionality - probably less efficient - by using a tokio::sync.:broadcast channel.

Here's the full code

/*

[dependencies]
async-std = { version = "1.10.0", features = ["unstable"] }
futures = "0.3.19"
pin-project = "1.0.8"

*/

#![warn(clippy::todo)]
#![allow(clippy::type_complexity)]

use std::{cell::Cell, task::Poll};

use async_std::{prelude::*, stream};
use futures::{future::MaybeDone, ready};
use pin_project::pin_project;

struct StreamBroadcast<S: Stream> {
    stream: Option<S>,

    items: Cell<[Poll<Option<S::Item>>; 2]>,
}

trait StreamBroadcastExt: Sized + Stream {
    fn broadcast(self) -> StreamBroadcast<Self>;
}

impl<S> StreamBroadcastExt for S
where
    S: Stream,
    S::Item: Clone,
{
    fn broadcast(self) -> StreamBroadcast<Self> {
        StreamBroadcast {
            stream: Some(self),
            items: Cell::new([Poll::Pending, Poll::Pending]),
        }
    }
}

// todo: does not properly notify context, so it won't work in spawned tasks and the like right now.

/// Note: Does not work anymore if it escapes the `consume_with` call.
struct BroadcastReceiver<'a, Item>(&'a Cell<Poll<Option<Item>>>);

impl<'a, Item> Stream for BroadcastReceiver<'a, Item> {
    type Item = Item;

    fn poll_next(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
        self.0.replace(Poll::Pending)
    }
}


impl<S> StreamBroadcast<S>
where
    S: Stream,
    S::Item: Clone,
{
    async fn consume_with<'a, F1, F2, Fut1, Fut2>(
        &'a mut self,
        f1: F1,
        f2: F2,
    ) -> (Fut1::Output, Fut2::Output)
    where
        F1: FnOnce(BroadcastReceiver<'a, S::Item>) -> Fut1,
        F2: FnOnce(BroadcastReceiver<'a, S::Item>) -> Fut2,
        Fut1: Future,
        Fut2: Future,
    {
        let items = Cell::as_slice_of_cells(&self.items);
        BroadcastFuture {
            stream: self.stream.take().expect(
                "StreamBroadcast::consume_with must not be called multiple times on the same value",
            ),
            items: Some(&self.items),
            fut1: MaybeDone::Future(f1(BroadcastReceiver(&items[0]))),
            fut2: MaybeDone::Future(f2(BroadcastReceiver(&items[1]))),
        }
        .await
    }
}

#[pin_project]
struct BroadcastFuture<'a, S, Fut1, Fut2>
where
    S: Stream,
    Fut1: Future,
    Fut2: Future,
{
    #[pin]
    stream: S,
    items: Option<&'a Cell<[Poll<Option<S::Item>>; 2]>>,
    #[pin]
    fut1: MaybeDone<Fut1>,
    #[pin]
    fut2: MaybeDone<Fut2>,
}

impl<'a, S, Fut1, Fut2> Future for BroadcastFuture<'a, S, Fut1, Fut2>
where
    S: Stream,
    S::Item: Clone,
    Fut1: Future,
    Fut2: Future,
{
    type Output = (Fut1::Output, Fut2::Output);

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let mut this = self.project();
        Poll::Ready(loop {
            if this.fut1.as_mut().poll(cx).is_ready() & this.fut2.as_mut().poll(cx).is_ready() {
                match (this.fut1.take_output(), this.fut2.take_output()) {
                    (Some(r1), Some(r2)) => break (r1, r2),
                    _ => panic!("BroadcastFuture was polled after completion"),
                }
            }

            if let Some(items_ref) = *this.items {
                let items = items_ref.replace([Poll::Pending, Poll::Pending]);
                if matches!(items, [Poll::Pending, Poll::Pending]) {
                    match ready!(this.stream.as_mut().poll_next(cx)) {
                        None => {
                            items_ref.set([Poll::Ready(None), Poll::Ready(None)]);
                            *this.items = None;
                        }
                        Some(next_item) => items_ref.set([
                            Poll::Ready(Some(next_item.clone())),
                            Poll::Ready(Some(next_item)),
                        ]),
                    }
                }
            }
        })
    }
}

fn some_expensive_computation() -> u32 {
    println!("expensive!!");
    42
}

fn some_condition(n: &u32) -> bool {
    n % 2 == 0
}

fn some_other_condition(n: &u32) -> bool {
    n % 2 == 1
}
fn some_transformation(n: u32) -> u32 {
    n + 100
}
fn some_other_transformation(n: u32) -> u32 {
    n + 1000
}

#[allow(clippy::redundant_closure)]
fn main() {
    let my_collection = [(), (), ()];

    let (a, b) = futures::executor::block_on(
        stream::from_iter(my_collection.iter().map(|_x| some_expensive_computation()))
            .broadcast()
            .consume_with(
                |iter| {
                    iter.filter(|x| some_condition(x))
                        .map(|x| some_transformation(x))
                        .max()
                },
                |iter| {
                    iter.map(|x| some_other_transformation(x))
                        .filter(|x| some_other_condition(x))
                        .count()
                },
            ),
    );
    dbg!(a, b);
}
expensive!!
expensive!!
expensive!!
[src\main.rs:181] a = Some(
    142,
)
[src\main.rs:181] b = 0
2 Likes

Just use a for loop:

let mut max = None;
let mut count = 0;
for v in my_collection
    .iter()
    .map(|x| some_expensive_computation())
    .filter(|x| some_condition(x))
    .map(|x| some_transformation(x))
{
  count += 1;
  max = Some(max.map_or(v, |m| if m > v { m} else {v}));
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.