Async Iterator (aka Stream) – Which crate to use?

I tried again to do it manually, and (kinda) succeeded :smiley:.

Of course, the .then approach is better in this simple example, but I wanted to share my result anyway:

#![feature(type_alias_impl_trait)] // needed for type … = impl …
use futures::{Stream, StreamExt};
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::pin;

async fn get_value(key: usize) -> String {
    format!("Value{}", key)
}

fn get_values(keys: Range<usize>) -> impl Stream<Item = String> {

    type GetValueFut = std::future::Ready<String>;
    // The following alternative requires #![feature(type_alias_impl_trait)]
    // and fails with error[E0391] (cycle detected…) during compilation:
    //type GetValueFut = impl Future<Output = String>;

    struct St {
        keys: Range<usize>,
        fut: Option<GetValueFut>,
    }
    impl Stream for St {
        type Item = String;
        fn poll_next(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Option<<Self as Stream>::Item>> {
            match &mut self.fut {
                Some(f) => {
                    pin!(f);
                    f.poll(cx).map(|x| Some(x))
                }
                None => match self.keys.next() {
                    Some(k) => {
                        let f = get_value(k);
                        pin!(f);
                        f.poll(cx).map(|x| Some(x))
                    }
                    None => Poll::Ready(None),
                },
            }
        }
    }
    St { keys, fut: None }
}

#[tokio::main]
async fn main() {
    let stream = get_values(0..3);
    pin!(stream);
    while let Some(value) = stream.next().await {
        println!("Got: {}", value);
    }
}

Note that I need to know the exact type of the future returned by get_value for the type alias GetValueFut (which happens to be std::future::Ready in this simple example).

It would be nicer to let the compiler determine the type, such that other implementations of get_value work as well (which requires #![feature(type_alias_impl_trait)]):

type GetValueFut = impl Future<Output = String>;

When I replace the type alias with that version, I get a compiler error though:

error[E0391]: cycle detected when computing type of `get_values::GetValueFut::{opaque#0}`
  --> src/main.rs:18:24
   |
18 |     type GetValueFut = impl Future<Output = String>;
   |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
note: ...which requires type-checking `get_values::<impl at src/main.rs:24:5: 45:6>::poll_next`...
  --> src/main.rs:30:24
   |
30 |             match &mut self.fut {
   |                        ^^^^
   = note: ...which requires evaluating trait selection obligation `for<'r> core::pin::Pin<&'r mut get_values::St>: core::ops::deref::DerefMut`...
   = note: ...which again requires computing type of `get_values::GetValueFut::{opaque#0}`, completing the cycle
note: cycle used when checking item types in top-level module
  --> src/main.rs:1:1
   |
1  | / #![feature(type_alias_impl_trait)] // needed for type … = impl …
2  | | use futures::{Stream, StreamExt};
3  | | use std::future::Future;
4  | | use std::ops::Range;
...  |
55 | |     }
56 | | }
   | |_^

error: could not find defining uses
  --> src/main.rs:18:24
   |
18 |     type GetValueFut = impl Future<Output = String>;
   |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^

I assume that's because using the existential type in a function that also returns an existential type. (Edit: Maybe not, as removing one impl didn't help, see update below.) I tried to help the compiler by providing a function that should be sufficient to figure out what GetValueFut needs to be:

async fn get_value(key: usize) -> String {
    format!("Value{}", key)
}

type GetValueFut = impl Future<Output = String>;
fn _dummy() -> GetValueFut {
    get_value(0)
}

fn get_values(keys: Range<usize>) -> impl Stream<Item = String> {
    struct St {
        keys: Range<usize>,
        fut: Option<GetValueFut>,
    }
    impl Stream for St { /* … */ }
    St { keys, fut: None }
}

Now the second error ("could not find defining uses") disappears, but I still get:

error[E0391]: cycle detected when computing type of `GetValueFut::{opaque#0}`

Did I discover a bug in the type_alias_impl_trait feature, or am I doing something wrong?

I'm using rustc 1.57.0-nightly (41dfaaa3c 2021-10-10).


Update: Moving GetValuesFut and St out of the function and explicitly letting get_values return St (or not declaring get_values at all) doesn't solve the problem either, so the following example also fails to compile:

#![feature(type_alias_impl_trait)]
use futures::Stream;
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};

async fn get_value(key: usize) -> String {
    format!("Value{}", key)
}

type GetValueFut = impl Future<Output = String>;

struct St {
    keys: Range<usize>,
    fut: Option<GetValueFut>,
}
impl Stream for St {
    type Item = String;
    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<<Self as Stream>::Item>> {
        match &mut self.fut {
            Some(f) => todo!(),
            None => todo!(),
        }
    }
}

Update 2: I can circumvent the problem by using a boxed future on the heap:

fn get_values(keys: Range<usize>) -> impl Stream<Item = String> {
    type GetValueFut = Pin<Box<dyn Future<Output = String>>>;
    struct St {
        keys: Range<usize>,
        fut: Option<GetValueFut>,
    }
    impl Stream for St {
        type Item = String;
        fn poll_next(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Option<<Self as Stream>::Item>> {
            match &mut self.fut {
                Some(f) => {
                    pin!(f);
                    f.poll(cx).map(|x| Some(x))
                }
                None => match self.keys.next() {
                    Some(k) => {
                        let f = Box::pin(get_value(k));
                        pin!(f);
                        f.poll(cx).map(|x| Some(x))
                    }
                    None => Poll::Ready(None),
                },
            }
        }
    }
    St { keys, fut: None }
}

In that case, I can implement the stream with stable Rust. Not that it matters, but it's an extra heap allocation for each value. It would be nicer if this was solved by the compiler at compile-time instead of being dealt with at run-time.