Hey rust folks, I'm trying to understand how to create an async_std::stream::Stream which calls an async function in its poll_next method.
My current code to test out this concept is included below. get_async_num() -> i32
is a test function I'm using (and will be replaced with something else later).
I know that poll_next is not an async function, so the current error I get is:
await` is only allowed inside `async` functions and blocks
Here is the code:
use async_std::stream::Stream;
use async_std::task;
use futures::stream::StreamExt;
use rand::distributions::{Distribution, Uniform};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::process;
use golgi::GolgiError; // this is just an error type I made to combine errors with
pub async fn get_async_num() -> i32 {
let millis = Uniform::from(0..10).sample(&mut rand::thread_rng());
println!("get_async_num will complete in {} ms", millis);
task::sleep(Duration::from_millis(millis)).await;
let i: i32 = rand::random();
i
}
pub struct SourceStream;
impl Stream for SourceStream {
type Item = i32;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let num = get_async_num().await;
Poll::Ready(Some(num))
}
}
// the code below is just to create a main function, and a runtime, and create the stream above
async fn run() -> Result<(), GolgiError> {
let source_stream = SourceStream{};
futures::pin_mut!(source_stream);
loop {
let next_item = source_stream.next().await;
println!("item: {:?}", next_item);
task::sleep(Duration::from_millis(1000)).await;
}
}
#[async_std::main]
async fn main() {
if let Err(e) = run().await {
eprintln!("Application error: {}", e);
process::exit(1);
}
}
On the other hand, I can see this code, using then, works:
async fn run() -> Result<(), GolgiError> {
pub fn get_source_stream() -> impl Stream<Item = i32> {
iter(0..).then(|i| {
let x = get_async_num();
x
})
}
let mut source_stream = get_source_stream();
futures::pin_mut!(source_stream);
loop {
let next_item = source_stream.next().await;
println!("item: {:?}", next_item);
task::sleep(Duration::from_millis(1000)).await;
}
}
Is there a way to translate the second version, into the first version? Or a way to call an async function for each element of a stream not just as a transformation, but inside of poll_next?
In the end, based on the result of the async function, I want to either return the value or end the stream. For example, if get_async_num returns a 0, to end the stream.