How to create async_std::stream::Stream which calls async function in poll_next?

Hey rust folks, I'm trying to understand how to create an async_std::stream::Stream which calls an async function in its poll_next method.

My current code to test out this concept is included below. get_async_num() -> i32 is a test function I'm using (and will be replaced with something else later).

I know that poll_next is not an async function, so the current error I get is:
await` is only allowed inside `async` functions and blocks

Here is the code:

use async_std::stream::Stream;
use async_std::task;
use futures::stream::StreamExt;
use rand::distributions::{Distribution, Uniform};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::process;
use golgi::GolgiError; // this is just an error type I made to combine errors with

pub async fn get_async_num() -> i32 {
    let millis = Uniform::from(0..10).sample(&mut rand::thread_rng());
    println!("get_async_num will complete in {} ms", millis);

    task::sleep(Duration::from_millis(millis)).await;
    let i: i32 = rand::random();
    i
}

pub struct SourceStream;

impl Stream for SourceStream {
    type Item = i32;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let num = get_async_num().await;
        Poll::Ready(Some(num))
    }
}

// the code below is just to create a main function, and a runtime, and create the stream above

async fn run() -> Result<(), GolgiError> {
    let source_stream = SourceStream{};
    futures::pin_mut!(source_stream);
    loop {
        let next_item = source_stream.next().await;
        println!("item: {:?}", next_item);
        task::sleep(Duration::from_millis(1000)).await;
    }
}

#[async_std::main]
async fn main() {
    if let Err(e) = run().await {
        eprintln!("Application error: {}", e);
        process::exit(1);
    }
}

On the other hand, I can see this code, using then, works:

async fn run() -> Result<(), GolgiError> {

    pub fn get_source_stream() -> impl Stream<Item = i32> {
        iter(0..).then(|i| {
            let x = get_async_num();
            x
        })
    }

    let mut source_stream = get_source_stream();
    futures::pin_mut!(source_stream);

    loop {
        let next_item = source_stream.next().await;
        println!("item: {:?}", next_item);
        task::sleep(Duration::from_millis(1000)).await;
    }
}

Is there a way to translate the second version, into the first version? Or a way to call an async function for each element of a stream not just as a transformation, but inside of poll_next?

In the end, based on the result of the async function, I want to either return the value or end the stream. For example, if get_async_num returns a 0, to end the stream.

The async-stream crate provides a macro to easily make .await-based streams: https://crates.io/crates/async-stream

1 Like

this crate has helped me get a working solution, thanks!

still would be curious to better understand how one could implement this without the macro, but happy to have something that works

You would need to store the future returned by the async function in the stream struct and poll it in the poll_next method.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.