Announcing stream_generator (creates a stream from an async generator function)

stream_generator is an alternative to async-stream and genawaiter.
It allows you to create a stream from a generator function:

use futures::{stream::StreamExt, Stream};
use stream_generator::generate_stream;

fn my_stream(start: u32) -> impl Stream<Item=u32> {
    generate_stream(move |mut y| async move {
        for i in start.. {
            y.send(i).await;
            if i == 45 {
                break;
            }
        }
    })
}

#[tokio::main]
async fn main() {
    let values: Vec<_> = my_stream(42).collect().await;
    assert_eq!(values, vec![42, 43, 44, 45]);
}

There is also Result-friendly generate_try_stream helper.

Key features:

  • Very simple, safe-only implementation which only depends on the futures crate;
  • No macros and no unstable syntax, so the code is easy to understand, doesn't clash with other macros, and rustfmt works on it;
  • The yielder is a proper public type, so a complex generator function can be easily split into multiple functions receiving the yielder as an argument.
  • generate_try_stream allows you to explicitly yield both Ok and Err values, and the stream doesn't have to end after yielding an Err item.
7 Likes

It reminds me of once_cell compared to lazy_static. It's nice to have a "macroless" way of doing things.

An unfortunate caveat here is that the user may write a generator function with "bad form". Perhaps these corner cases should be documented. What happens in these cases?

generate_stream(move |mut y| async move {
  y.send(1)
  // function completes with a pending future
});

or

generate_stream(move |mut y| async move {
  y.send(1);
  // send without awaiting the previous send
  y.send(2).await;
});

Also, I wonder if it would be better to provide a mutable reference to Yielder? Then the user would not have to write mut.

This will not typecheck because the future is required to resolve to ().

Yes, this is an easy mistake to make in general, but it's not really specific to this crate. Luckily, the compiler does a good job of preventing that with a warning (unused implementer of futures::Future that must be used).

I've considered this, but providing an owned Yielder is more flexible and avoids extra complications with lifetimes. For example, generate_try_stream is now implemented trivially using generate_stream, but with the proposed change, it would not be easy to fix. It's also nice that you can freely store an owned yielder in a struct's field or even move it to a new spawned task.

I forgot a semicolon.

I disagree. This crate allows me to send without suspending (await-ing). The other crates you mentioned do not have such a possibility.

A warning does not prevent compiling. Developers ignore warnings sometimes (shame on them!).

Sure, you can reasonably expect the user to avoid these cases. But I think you should at least consider how the library should behave in these cases when they arise. It is a matter of when, not if.