Hi,
I am wondering if anybody have thought of any approaches to writing futures (and/or streams) in async/await
syntax yet with their state being observable (hence, non-anonymous)?
In my particular case, I need the underlying state of a stream to be serializable so I have to implement polling function and manage the state machine explicitly. This, however, comes at a rather significant cost of mental overhead and complexity (case in point: https://github.com/bpxe/bpxe/blob/master/bpxe/src/activity/mod.rs). It feels like if I was able to write the entire piece in async/await
fashion, it'll be far easier to manage it.
So I've tried to imagine a proc-macro that would convert async functions into explicitly typed state, and hand-coded a rough approximation of what the input and generated code can look like (see the code snippet below).
Are there any other approaches that can be taken to solve this problem? Am I overthinking or overlooking something?
Thanks!
#![allow(non_camel_case_types)]
// The code below is a VERY VERY VERY VERY LOOSE AND HAND-WAVY APPROXIMATION
// of what might be generated for this snippet:
//
// ```
// use serde::*
// use tokio::task;
//
// #[asyncify(methods(test))]
// #[derive(Serialize, Deserialize)]
// struct Test {
// something: u8,
// }
//
// #[asyncify]
// impl Test {
// async fn test(&mut self) -> u8 {
// let counter = self.something + 1;
// task::yield_now().await;
// something = counter;
// counter
// }
// }
// ```
//
use serde::*;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::task;
// Note how this is serializable
#[derive(Serialize, Deserialize)]
struct Test {
something: u8,
asyncify_test: Option<Asyncify_Test_test>,
}
// Note how this is serializable
#[derive(Serialize, Deserialize)]
enum Asyncify_Test_test {
Start,
Yield1 { counter: u8 },
End(u8),
}
enum Asyncify_Test_test_yield {
Yield1(Pin<Box<dyn Future<Output = ()>>>, Option<()>),
}
impl Test {
fn test(&mut self) -> impl Future<Output = u8> + '_ {
self.asyncify_test = Some(Asyncify_Test_test::Start);
Asyncify_Test_test_future(self, None)
}
}
pub struct Asyncify_Test_test_future<'a>(&'a mut Test, Option<Asyncify_Test_test_yield>);
impl<'a> Future for Asyncify_Test_test_future<'a> {
type Output = u8;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().1.take() {
None => {}
Some(Asyncify_Test_test_yield::Yield1(mut y, _)) => match y.as_mut().poll(cx) {
Poll::Pending => {
self.as_mut().1 = Some(Asyncify_Test_test_yield::Yield1(y, None));
return Poll::Pending;
}
Poll::Ready(v) => {
self.as_mut().1 = Some(Asyncify_Test_test_yield::Yield1(y, Some(v)));
}
},
}
match self.as_mut().0.asyncify_test.take() {
None => panic!(),
Some(Asyncify_Test_test::Start) => {
self.as_mut().0.asyncify_test = Some(Asyncify_Test_test::Yield1 {
counter: self.as_ref().0.something + 1,
});
cx.waker().wake_by_ref();
Poll::Pending
}
Some(Asyncify_Test_test::Yield1 { counter }) => {
match self.as_mut().1.take() {
None => {
self.as_mut().1 = Some(Asyncify_Test_test_yield::Yield1(
Box::pin(task::yield_now()),
None,
));
self.as_mut().0.asyncify_test =
Some(Asyncify_Test_test::Yield1 { counter });
}
Some(Asyncify_Test_test_yield::Yield1(_, Some(()))) => {
self.as_mut().0.asyncify_test = Some(Asyncify_Test_test::End(counter));
self.as_mut().0.something = counter;
}
Some(y) => {
self.as_mut().1 = Some(y);
self.as_mut().0.asyncify_test =
Some(Asyncify_Test_test::Yield1 { counter });
}
}
cx.waker().wake_by_ref();
Poll::Pending
}
Some(Asyncify_Test_test::End(v)) => Poll::Ready(v),
}
}
}
// And this is our test
#[tokio::main]
async fn main() {
let mut test = Test {
something: 0,
asyncify_test: None,
};
println!("{}", test.test().await);
println!("{}", test.something);
}
Output:
1
1
Errors:
Compiling playground v0.0.1 (/playground)
Finished dev [unoptimized + debuginfo] target(s) in 2.38s
Running `target/debug/playground`