Async Iterator (aka Stream) – Which crate to use?

I have the task of creating an interface that returns certain items, one after the other, using an asynchronous backend. As Iterator::next from the standard library would block, this is certainly not what I want to use.

Reading into it, I figured out that streams are probably what I want. Taking a look at tokio::stream, it seems that streams are available through a different crate, named tokio-stream. It seems pretty complex to create a stream, or at least laborious, considering the function signature of Stream::poll_next:

fn poll_next(
    self: Pin<&mut Self>,
    cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>

Let's assume, I have the following asynchronous backend:

async fn get_value(key: usize) -> String {
    /* some code here */
}

And I want to provide the following async function:

fn get_values(keys: Range<usize>) -> impl Stream<Item = String>;

Then I assume my get_values function must return a value that can both store the Range (which is also an Iterator over the keys) and optionally a Future for the case that a call to the poll method of the future returned by get_value returned Poll::Pending. :exploding_head:

I tried to write that down, but failed utterly.

The Tokio tutorial on streams also states that it's a pretty difficult task:

Manually implementing streams using the Stream trait can be tedious. Unfortunately, the Rust programming language does not yet support async/await syntax for defining streams. This is in the works, but not yet ready.

The async-stream crate is available as a temporary solution. This crate provides a stream! macro that transforms the input into a stream.

So I tried to do it with async-stream, which is yet another crate! I succeeded with this code:

use async_stream::stream;
use futures::{Stream, StreamExt};
use std::ops::Range;
use tokio::pin;

async fn get_value(key: usize) -> String {
    format!("Value{}", key)
}

fn get_values(keys: Range<usize>) -> impl Stream<Item = String> {
    stream! {
        for key in keys {
            yield get_value(key).await;
        }
    }
}

#[tokio::main]
async fn main() {
    let stream = get_values(0..3);
    pin!(stream);
    while let Some(value) = stream.next().await {
        println!("Got: {}", value);
    }
}

Is that the best way to deal with the problem?

If I get it right, then both tokio-stream and async-stream use the same kind of stream (futures_core::stream::Stream, which is the same as futures::stream::Stream, isn't it!?). And that stream is different from the nightly-only experimental API in std::stream::Stream which gets enabled with #![feature(async_stream)]? :dizzy_face:

That would mean tokio-stream and async-stream are interoperable, but std::stream isn't?

The stream! macro (currently) comes with some overhead, AFAIR. In this case you’re only mapping an async fn oven an iterator, so you could just use

use futures::{stream, Stream, StreamExt};
use std::ops::Range;
use tokio::pin;

async fn get_value(key: usize) -> String {
    format!("Value{}", key)
}

fn get_values(keys: Range<usize>) -> impl Stream<Item = String> {
    stream::iter(keys).then(get_value)
}

#[tokio::main]
async fn main() {
    let stream = get_values(0..3);
    pin!(stream);
    while let Some(value) = stream.next().await {
        println!("Got: {}", value);
    }
}

The stream! macro is still useful for more complex situations where you can’t easily use existing Stream methods/combinators.


Nightly-only experimental API isn’t supposed to be used yet, so where’s the problem with it being a different trait? The situation used to be similar with the Future trait. Note that once Stream from std will become stable, futures::stream::Stream will become a re-export of std::stream:.Stream, just like futures::future::Future currently is a re-export of std::future::Future.

3 Likes

Okay, I'll keep that in mind.

Oh, cool, I tried looking at Stream::map, but figured it wasn't async. So Stream::then is what I want to use!

I really like the yield, and it's what I know from other languages too. Hopefully Rust will incorporate such a way (or a similar way) to create streams eventually. In my above example, I'd still use .then though, because it's less procedural and more functional.

No problem here, it just added to my confusion because of what I read about streams first being included in the tokio crate, then not, and the similarity of names (crate futures, crate futures-core, trait Future in std::future, crate async-stream, feature async_stream of nightly Rust, etc). All this can be a bit overwhelming, so I wanted to make sure I got this right.

But what about futures_core::stream::Stream and futures::stream::Stream? Are those two the same? One doesn't seem to be a re-export of the other? I think I'm missing something here.

It is a re-export.

https://docs.rs/futures/0.3.17/src/futures/lib.rs.html#111

Strictly speaking, futures::stream is a reexport of futures_util::stream and futures_util::stream::Stream is a reexport of futures_core::stream::Stream.

2 Likes

It’s unfortunately not as clear in the docs as it could be. When in doubt, click that [src] link and look at the URL where you’ve been taken. On futures::stream::Stream it takes you to https://docs.rs/futures-core/0.3.17/src/futures_core/stream.rs.html#27-98; on futures::future::Future it takes you to https://doc.rust-lang.org/nightly/src/core/future/future.rs.html#32-100; etc.

1 Like

:flushed: I didn't look up to the URL bar. Thanks for that hint!

I tried again to do it manually, and (kinda) succeeded :smiley:.

Of course, the .then approach is better in this simple example, but I wanted to share my result anyway:

#![feature(type_alias_impl_trait)] // needed for type … = impl …
use futures::{Stream, StreamExt};
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::pin;

async fn get_value(key: usize) -> String {
    format!("Value{}", key)
}

fn get_values(keys: Range<usize>) -> impl Stream<Item = String> {

    type GetValueFut = std::future::Ready<String>;
    // The following alternative requires #![feature(type_alias_impl_trait)]
    // and fails with error[E0391] (cycle detected…) during compilation:
    //type GetValueFut = impl Future<Output = String>;

    struct St {
        keys: Range<usize>,
        fut: Option<GetValueFut>,
    }
    impl Stream for St {
        type Item = String;
        fn poll_next(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Option<<Self as Stream>::Item>> {
            match &mut self.fut {
                Some(f) => {
                    pin!(f);
                    f.poll(cx).map(|x| Some(x))
                }
                None => match self.keys.next() {
                    Some(k) => {
                        let f = get_value(k);
                        pin!(f);
                        f.poll(cx).map(|x| Some(x))
                    }
                    None => Poll::Ready(None),
                },
            }
        }
    }
    St { keys, fut: None }
}

#[tokio::main]
async fn main() {
    let stream = get_values(0..3);
    pin!(stream);
    while let Some(value) = stream.next().await {
        println!("Got: {}", value);
    }
}

Note that I need to know the exact type of the future returned by get_value for the type alias GetValueFut (which happens to be std::future::Ready in this simple example).

It would be nicer to let the compiler determine the type, such that other implementations of get_value work as well (which requires #![feature(type_alias_impl_trait)]):

type GetValueFut = impl Future<Output = String>;

When I replace the type alias with that version, I get a compiler error though:

error[E0391]: cycle detected when computing type of `get_values::GetValueFut::{opaque#0}`
  --> src/main.rs:18:24
   |
18 |     type GetValueFut = impl Future<Output = String>;
   |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
note: ...which requires type-checking `get_values::<impl at src/main.rs:24:5: 45:6>::poll_next`...
  --> src/main.rs:30:24
   |
30 |             match &mut self.fut {
   |                        ^^^^
   = note: ...which requires evaluating trait selection obligation `for<'r> core::pin::Pin<&'r mut get_values::St>: core::ops::deref::DerefMut`...
   = note: ...which again requires computing type of `get_values::GetValueFut::{opaque#0}`, completing the cycle
note: cycle used when checking item types in top-level module
  --> src/main.rs:1:1
   |
1  | / #![feature(type_alias_impl_trait)] // needed for type … = impl …
2  | | use futures::{Stream, StreamExt};
3  | | use std::future::Future;
4  | | use std::ops::Range;
...  |
55 | |     }
56 | | }
   | |_^

error: could not find defining uses
  --> src/main.rs:18:24
   |
18 |     type GetValueFut = impl Future<Output = String>;
   |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^

I assume that's because using the existential type in a function that also returns an existential type. (Edit: Maybe not, as removing one impl didn't help, see update below.) I tried to help the compiler by providing a function that should be sufficient to figure out what GetValueFut needs to be:

async fn get_value(key: usize) -> String {
    format!("Value{}", key)
}

type GetValueFut = impl Future<Output = String>;
fn _dummy() -> GetValueFut {
    get_value(0)
}

fn get_values(keys: Range<usize>) -> impl Stream<Item = String> {
    struct St {
        keys: Range<usize>,
        fut: Option<GetValueFut>,
    }
    impl Stream for St { /* … */ }
    St { keys, fut: None }
}

Now the second error ("could not find defining uses") disappears, but I still get:

error[E0391]: cycle detected when computing type of `GetValueFut::{opaque#0}`

Did I discover a bug in the type_alias_impl_trait feature, or am I doing something wrong?

I'm using rustc 1.57.0-nightly (41dfaaa3c 2021-10-10).


Update: Moving GetValuesFut and St out of the function and explicitly letting get_values return St (or not declaring get_values at all) doesn't solve the problem either, so the following example also fails to compile:

#![feature(type_alias_impl_trait)]
use futures::Stream;
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};

async fn get_value(key: usize) -> String {
    format!("Value{}", key)
}

type GetValueFut = impl Future<Output = String>;

struct St {
    keys: Range<usize>,
    fut: Option<GetValueFut>,
}
impl Stream for St {
    type Item = String;
    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<<Self as Stream>::Item>> {
        match &mut self.fut {
            Some(f) => todo!(),
            None => todo!(),
        }
    }
}

Update 2: I can circumvent the problem by using a boxed future on the heap:

fn get_values(keys: Range<usize>) -> impl Stream<Item = String> {
    type GetValueFut = Pin<Box<dyn Future<Output = String>>>;
    struct St {
        keys: Range<usize>,
        fut: Option<GetValueFut>,
    }
    impl Stream for St {
        type Item = String;
        fn poll_next(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Option<<Self as Stream>::Item>> {
            match &mut self.fut {
                Some(f) => {
                    pin!(f);
                    f.poll(cx).map(|x| Some(x))
                }
                None => match self.keys.next() {
                    Some(k) => {
                        let f = Box::pin(get_value(k));
                        pin!(f);
                        f.poll(cx).map(|x| Some(x))
                    }
                    None => Poll::Ready(None),
                },
            }
        }
    }
    St { keys, fut: None }
}

In that case, I can implement the stream with stable Rust. Not that it matters, but it's an extra heap allocation for each value. It would be nicer if this was solved by the compiler at compile-time instead of being dealt with at run-time.

FYI, that last edit isn't quite right since you never store the future in case it returns pending. Here's a version that fixes that. (You also don't need to call pin! on a Pin<Box<_>>)

pub fn get_values(keys: Range<usize>) -> impl Stream<Item = String> {
    type GetValueFut = Pin<Box<dyn Future<Output = String>>>;
    struct St {
        keys: Range<usize>,
        fut: Option<GetValueFut>,
    }
    impl Stream for St {
        type Item = String;
        fn poll_next(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Option<<Self as Stream>::Item>> {
            loop {
                match self.fut {
                    Some(ref mut f) => match f.as_mut().poll(cx) {
                        Poll::Pending => return Poll::Pending,
                        Poll::Ready(x) => {
                            std::mem::take(&mut self.fut);
                            return Poll::Ready(Some(x));
                        }
                    },
                    None => match self.keys.next() {
                        Some(k) => {
                            self.fut = Some(Box::pin(get_value(k)));
                        }
                        None => return Poll::Ready(None),
                    },
                }
            }
        }
    }
    St { keys, fut: None }
}
1 Like

You are correct. It only worked by accident with my simple example but would fail in other cases at run-time! Thanks for pointing that out.

Moreover, my previous approaches contained the same error. I just tried to fix it as follows:

#![feature(type_alias_impl_trait)] // needed for type … = impl …
use futures::{Stream, StreamExt};
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::pin;

fn get_value(key: usize) -> std::future::Ready<String> {
    std::future::ready(format!("Value{}", key))
}
/* BETTER:
async fn get_value(key: usize) -> String {
    format!("Value{}", key)
}
*/


fn get_values(keys: Range<usize>) -> impl Stream<Item = String> {
    type GetValueFut = std::future::Ready<String>;
    /* BETTER:
    type GetValueFut = impl Future<Output = String>;
    */

    struct St {
        keys: Range<usize>,
        fut: Option<GetValueFut>,
    }
    impl Stream for St {
        type Item = String;
        fn poll_next(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Option<<Self as Stream>::Item>> {
            loop {
                match &mut self.fut {
                    Some(f) => {
                        pin!(f);
                        let r = f.poll(cx).map(|x| Some(x));
                        if r.is_ready() {
                            self.fut = None;
                        }
                        return r;
                    }
                    None => match self.keys.next() {
                        Some(key) => self.fut = Some(get_value(key)),
                        None => return Poll::Ready(None),
                    },
                }
            }
        }
    }
    St { keys, fut: None }
}

#[tokio::main]
async fn main() {
    let stream = get_values(0..3);
    pin!(stream);
    while let Some(value) = stream.next().await {
        println!("Got: {}", value);
    }
}

I hope I did it right this time :sweat_smile:.

Three issues yet:

  1. The corrected example won't work anymore when I use the async keyword to define get_value (i.e. uncomment the first "BETTER" block). I get error[E0308]: mismatched types (expected struct std::future::Ready<String> found opaque type impl futures::Future). That makes sense, but I don't get why it worked in my very first (wrong) attempt in my previous post.

  2. The example won't work when I use type GetValueFut = impl Future<…> (i.e. uncomment the second "BETTER" block). Again, I get the "error[E0391]: cycle detected when computing type of get_values::GetValueFut::{opaque#0}".

  3. I tried to store a Pin<GetValueFut> instead of GetValueFut (and to pin! only once in the first None branch). But I got complex type errors when I tried that. Not sure how to do it. (It shouldn't make any difference at run-time, but feels a bit cleaner?) The only way to get that work was to use a pinned Box, but a Box is what I try to avoid here.

It seems like this particular case is blocked by the error E0391 cycle detected, which looks like it might be a compiler issue. This is a minimal case which still exhibits the error, but seems like something that should work to me (though I'm not familiar with the limits of type_alias_impl_trait).

playground

#![feature(type_alias_impl_trait)]
use futures::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

type GetValueFut = impl Future<Output = ()>;

fn get_value() -> GetValueFut {
    futures::future::ready(())
}

struct St {
    fut: Option<GetValueFut>,
}

impl Stream for St {
    type Item = ();
    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<<Self as Stream>::Item>> {
        self.fut = Some(get_value());
        return Poll::Ready(None);
    }
}
1 Like

That error seems to be because the self.fut = ... line is only valid if St implements the Unpin trait, since it has to be Unpin for you to be able to mutably access it through a pinned pointer. However determining whether St is Unpin requires figuring out whether GetValueFut is Unpin.

Unsafely unwrapping the Pin bybasses the error. playground

3 Likes

Similarly, helping out the compiler by claiming that the type is indeed Unpin also does the job

type GetValueFut = impl Future<Output = ()> + Unpin;
1 Like

Of course for the original code example, i.e. like this

#![feature(type_alias_impl_trait)] // needed for type … = impl …
use futures::{Stream, StreamExt};
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::pin;

async fn get_value(key: usize) -> String {
    format!("Value{}", key)
}



fn get_values(keys: Range<usize>) -> impl Stream<Item = String> {

    type GetValueFut = impl Future<Output = String> + Unpin;
    fn _dummy() -> GetValueFut {
        get_value(0)
    }
    // The following alternative requires #![feature(type_alias_impl_trait)]
    // and fails with error[E0391] (cycle detected…) during compilation:
    //type GetValueFut = impl Future<Output = String>;

    struct St {
        keys: Range<usize>,
        fut: Option<GetValueFut>,
    }
    impl Stream for St {
        type Item = String;
        fn poll_next(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Option<<Self as Stream>::Item>> {
            match &mut self.fut {
                Some(f) => {
                    pin!(f);
                    f.poll(cx).map(|x| Some(x))
                }
                None => match self.keys.next() {
                    Some(k) => {
                        let f = get_value(k);
                        pin!(f);
                        f.poll(cx).map(|x| Some(x))
                    }
                    None => Poll::Ready(None),
                },
            }
        }
    }
    St { keys, fut: None }
}

#[tokio::main]
async fn main() {
    let stream = get_values(0..3);
    pin!(stream);
    while let Some(value) = stream.next().await {
        println!("Got: {}", value);
    }
}

that doesn’t work, because the Future is not actually Unpin. In this case, you’ll have to solve the problem that you’re trying to poll self.fut without having a pinned reference to it, probably best by re-writing the struct using pin_project.

You want to be able to pin the field fut

use pin_project::pin_project;

#[pin_project]
struct St {
    keys: Range<usize>,
    #[pin]
    fut: Option<GetValueFut>,
}

then, you’ll need to use .project() accordingly

fn poll_next(
    self: Pin<&mut Self>,
    cx: &mut Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
    let mut this = self.project();
    match this.fut.as_mut().as_pin_mut() {
        Some(f) => {
            let r = f.poll(cx).map(|x| Some(x));
            if r.is_ready() {
                this.fut.as_mut().set(None);
            }
            r
        }
        None => match this.keys.next() {
            Some(k) => {
                let f: GetValueFut = _get_value(k);
                this.fut.as_mut().set(Some(f));
                let r = this
                    .fut
                    .as_mut()
                    .as_pin_mut()
                    .unwrap()
                    .poll(cx)
                    .map(|x| Some(x));
                if r.is_ready() {
                    this.fut.as_mut().set(None);
                }
                r
            }
            None => Poll::Ready(None),
        },
    }
}

(I also replaced _dummy with

fn _get_value(key: usize) -> GetValueFut {
    get_value(key)
}

to make the compiler happy. And I fixed the problem that the future was never stored, and fixed the problem that the future when ready was going to be polled again (which would lead to a panic).)

(playground)

1 Like

:see_no_evil:

So it's not an error (of the compiler) but all working as expected, I guess.

Yet another issue to learn: Projections and structural pinning. Whenever I think I got a better grasp at Rust, something new comes up. :open_mouth:

Seriously: Async/pinning/unpinning, etc. can be HELL for a newcomer to the language!! :hot_face: (And I'm not even that new to it…)

I'd like to cite my previous cite (from the Tokio tutorial):

To get back to the second half of the topic of this thread: Which crate to use? This would be my personal summary:

Feel free to add anything, and thanks for all that help!

1 Like

Note that, arguably, AFAIK that overhead is small. Just not zero.

That’s not exactly true. I think the people who work on the compiler error messages would consider it an error that the compiler wasn’t able to correctly diagnose the cause.

1 Like

I think I’d go further any say that there’s more bug here than just a diagnostics issue.

The cycle (in the minimal example above)

   Compiling playground v0.0.1 (/playground)
error[E0391]: cycle detected when computing type of `GetValueFut::{opaque#0}`
  --> src/lib.rs:7:20
   |
7  | type GetValueFut = impl Future<Output = ()>;
   |                    ^^^^^^^^^^^^^^^^^^^^^^^^
   |
note: ...which requires type-checking `<impl at src/lib.rs:17:1: 26:2>::poll_next`...
  --> src/lib.rs:23:9
   |
23 |         self.fut = Some(get_value());
   |         ^^^^
   = note: ...which requires evaluating trait selection obligation `for<'r> core::pin::Pin<&'r mut St>: core::ops::deref::DerefMut`...
   = note: ...which again requires computing type of `GetValueFut::{opaque#0}`, completing the cycle
note: cycle used when checking item types in top-level module
  --> src/lib.rs:1:1
   |
1  | / #![feature(type_alias_impl_trait)] // needed for type … = impl …
2  | | use futures::Stream;
3  | | use std::future::Future;
4  | | use std::pin::Pin;
...  |
25 | |     }
26 | | }
   | |_^

For more information about this error, try `rustc --explain E0391`.

can be broken by removing the need to “compute type of GetValueFut::{opaque#0}” in order to determine whether dereferencing the Pin<&mut …> reference is okay. That’s the approach of the proposed fix to use unsafe code, or alternatively use something like pin_project, or to add some + Unpin to the type alias.

But there’s two pieces to this cycle. Arguably, the other piece, i.e. the claim that

computing type of `GetValueFut::{opaque#0}`
requires type-checking `<impl at src/lib.rs:17:1: 26:2>::poll_next`

seems questionable. The compiler is able to figure out that the definition of poll_next does not constitute a defining use of GetValueFut (e.g. if you leave out fn get_value and add the Unpin (playground), it’s able to figure out that there’s no defining use of GetValueFut; and even without the + Unpin and with the cycle error still present, it’s still able to figure out that there’s no defining use (playground).). Thus I’m unsure why computing the type of GetValueFut doesn’t just depend on type-checking fn get_value, and nothing more.

1 Like

I don't like syntax macros when writing complex stream logic. (typing, format, IDE hints, ...)

So I write a crate transform_stream which is inspired by async-stream.

crate overhead alloc types coding style
futures-util zero zero public combinator
async-stream small zero hidden generator-like
transform-stream small zero public generator-like
1 Like

I wonder: Is there any macro allowing me to iterate more easily over an existing stream?

I have the following situation:

let mut stream = self.some_immutable_method(/* … */);
while let Some(item) = stream.next().await {
    /* … */
}
drop(stream);
self.some_mutable_method(/* … */);

I effectively have to write three lines for the iteration:

  • let mut stream = …
  • while let Some(item) = stream.next().await
  • drop(stream) (to avoid issues with later borrowing of self in this example)

Isn't it possible to replace this with a simple(?) macro as long as the language doesn't directly support iterating over streams? Maybe there exists a crate providing such a macro?