How to run async code within Option / Result function chain?

#![allow(dead_code)]

// Some async op. E.g., access database.
async fn do_some_async(_num: i32) -> bool {
    true
}

// How to run async code within a long Option / Result function chain?
// E.g., value.and_then(...).or_else(...).map(...)

// First try:
async fn not_work1(input: Option<i32>) -> Option<i32> {
    input
        .and_then(|num| {
            if do_some_async(num).await {   // not work, can not use `.await`
                                            // because not in async block
                Some(42)
            } else {
                None
            }
        })
}

// Second try: 
async fn not_work2(input: Option<i32>) -> Option<u32> {
    input
        .and_then(|num| async { // add async here
                                // not work, return `impl Future` but not Option
            if do_some_async(num).await {
                Some(42)
            } else {
                None
            }
        })
}

// This one work: but I don't want to expand op to outer scope all the time...
async fn work(input: Option<i32>) -> Option<u32> {
    match input {
        Some(num) => {
            if do_some_async(num).await {
                Some(42)
            } else {
                None
            }
        }
        None => None
    }
}

(Playground)

Errors:

   Compiling playground v0.0.1 (/playground)
error[E0728]: `await` is only allowed inside `async` functions and blocks
  --> src/lib.rs:15:16
   |
14 |         .and_then(|num| {
   |                   ----- this is not `async`
15 |             if do_some_async(num).await {   // not work, can not use `.await`
   |                ^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks

error[E0308]: mismatched types
  --> src/lib.rs:27:25
   |
27 |           .and_then(|num| async { // add async here
   |  _________________________^
28 | |                                 // not work, return `impl Future` but not Option
29 | |             if do_some_async(num).await {
30 | |                 Some(42)
...  |
33 | |             }
34 | |         })
   | |_________^ expected enum `Option`, found opaque type
   |
   = note:     expected enum `Option<_>`
           found opaque type `impl Future`
help: try using a variant of the expected enum
   |
27 |         .and_then(|num| Some(async { // add async here
28 |                                 // not work, return `impl Future` but not Option
29 |             if do_some_async(num).await {
30 |                 Some(42)
31 |             } else {
32 |                 None
 ...

error: aborting due to 2 previous errors

Some errors have detailed explanations: E0308, E0728.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `playground`

To learn more, run the command again with --verbose.

I feel I need to convert Option / Result to some async compatible things, maybe futures crate have them but I'm not sure what I'm looking for. Any suggestion?

I would write this particular example function e.g. as

async fn work2(input: Option<i32>) -> Option<u32> {
    let num = input?;
    do_some_async(num).await.then(|| 42)
}

(in particular using “?”-operator syntax); unfortunately it’s not possible to use existing methods like and_then when you want to .await something inside of the closure.

1 Like

Wait, let me double-check that for the concrete example… I’m possible confusing this example with something else…

Edit: Yes, right… and_then is problematic because you cannot have it run the closure and check the return type for Option. Something like map may still be somewhat useful (though it creates an Option<…future…>.)

This means, rewriting x.and_then(f) into x.map(f).flatten() can help. Together with futures::future::OptionFuture, you can then do OptionFuture::from(x.map(f)).await.flatten()

async fn works3(input: Option<i32>) -> Option<u32> {
    OptionFuture::from(input.map(|num| async move {
        if do_some_async(num).await {
            Some(42)
        } else {
            None
        }
    }))
    .await
    .flatten()
}

Btw, Option could add methods to support async like that

async fn option_and_then<T, U, F, Fut>(x: Option<T>, f: F) -> Option<U>
where
    F: FnOnce(T) -> Fut,
    Fut: Future<Output = Option<U>>,
{
    f(x?).await
}

async fn works4(input: Option<i32>) -> Option<u32> {
    option_and_then(input, |num| async move {
        if do_some_async(num).await {
            Some(42)
        } else {
            None
        }
    })
    .await
}

It’s a bit harder for other crates to offer methods like that (instead of free-standing functions) because they couldn’t use async fn in a method of an extension trait. They could use a manually-written future struct, or a boxed trait object.


Edit: I had some fun writing out an explicit future state machine for such a method; it’s fairly unrelated to your question, but take a look if you’re interested anyway (click to expand)

starting point:

async fn option_and_then<T, U, F, Fut>(x: Option<T>, f: F) -> Option<U>
where
    F: FnOnce(T) -> Fut,
    Fut: Future<Output = Option<U>>,
{
    f(x?).await
}

i.e.

async fn async_and_then_desugared<T, U, F, Fut>(x: Option<T>, f: F) -> Option<U>
where
    F: FnOnce(T) -> Fut,
    Fut: Future<Output = Option<U>>,
{
    match x {
        None => None,
        Some(x) => f(x).await,
    }
}

We want an extension trait

pub trait OptionExt<T> {
    fn async_and_then<U, F, Fut>(self, f: F) -> OptionAndThen<T, F, Fut>
    where
        F: FnOnce(T) -> Fut,
        Fut: Future<Output = Option<U>>;
}

and implement it

impl<T> OptionExt<T> for Option<T> {
    fn async_and_then<U, F, Fut>(self, f: F) -> OptionAndThen<T, F, Fut>
    where
        F: FnOnce(T) -> Fut,
        Fut: Future<Output = Option<U>>,
    {
        todo!()
    }
}

so that we can ultimately use it as

// Some async op. E.g., access database.
async fn do_some_async(_num: i32) -> bool {
    true
}

async fn it_works(input: Option<i32>) -> Option<u32> {
    input
        .async_and_then(|num| async move {
            if do_some_async(num).await {
                Some(42)
            } else {
                None
            }
        })
        .await
}

Okay, so we need to implement the struct that’s the return type. Looking back at

async fn async_and_then_desugared<T, U, F, Fut>(x: Option<T>, f: F) -> Option<U>
where
    F: FnOnce(T) -> Fut,
    Fut: Future<Output = Option<U>>,
{
    match x {
        None => None,
        Some(x) => f(x).await,
    }
}

there’s one await point in this function. Out Future as a state machine needs 1+3 states then. The 3 additional states are:

  • initial state (nothing done yet, we just have the arguments stored)
  • done state (already returned Poll::Ready, next call to .poll should panic)
  • poisoned state (if the call to f panics, we cannot .poll the same future again either, in case someone catches the panic)

The state for the .await needs only the future that’s awaited, there are no other local variables or temporaries in scope left.

#[pin_project(project = OptionAndThenProj, project_replace = OptionAndThenProjOwn)]
pub enum OptionAndThen<T, F, Fut> {
    Initial(Option<T>, F),
    Polling(#[pin] Fut),
    Poisoned,
    Done,
}

Uses pin_project in order to support Fut: !Unpin.
We can now resolve the todo!() from the trait implementation:

impl<T> OptionExt<T> for Option<T> {
    fn async_and_then<U, F, Fut>(self, f: F) -> OptionAndThen<T, F, Fut>
    where
        F: FnOnce(T) -> Fut,
        Fut: Future<Output = Option<U>>,
    {
        OptionAndThen::Initial(self, f)
    }
}

Now all that’s left is to implement the state machine

impl<T, U, F, Fut> Future for OptionAndThen<T, F, Fut>
where
    F: FnOnce(T) -> Fut,
    Fut: Future<Output = Option<U>>,
{
    type Output = Option<U>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            break match self.as_mut().project() {
                OptionAndThenProj::Initial(_, _) => {
                    match self.as_mut().project_replace(OptionAndThen::Poisoned) {
                        OptionAndThenProjOwn::Initial(x, f) => match x {
                            None => {
                                self.set(OptionAndThen::Done);
                                Poll::Ready(None)
                            }
                            Some(x) => {
                                self.set(OptionAndThen::Polling(f(x)));
                                continue;
                            }
                        },
                        _ => unreachable!(),
                    }
                }
                OptionAndThenProj::Polling(fut) => match fut.poll(cx) {
                    Poll::Pending => Poll::Pending,
                    Poll::Ready(x) => {
                        self.set(OptionAndThen::Done);
                        Poll::Ready(x)
                    }
                },
                OptionAndThenProj::Poisoned => {
                    panic!("OptionAndThen future poisoned")
                }
                OptionAndThenProj::Done => {
                    panic!("OptionAndThen future already done")
                }
            };
        }
    }
}

Again, compare this to

async fn async_and_then_desugared<T, U, F, Fut>(x: Option<T>, f: F) -> Option<U>
where
    F: FnOnce(T) -> Fut,
    Fut: Future<Output = Option<U>>,
{
    match x {
        None => None,
        Some(x) => f(x).await,
    }
}

and you can see the match x is still there; the f(x).await changes into the OptionAndThen::Polling(f(x)) state. When returning Poll::Ready, we always go to the Done state. The Poison state is used for mem::take-style processing of the state enum, as a temporary stand-in that’s only ever left in place if we exit poll on a panic-path. (The only possibility for this is when the f(x)-call panics.)

Full code in the playground (everything’s untested, not guarantees that it actually does the right thing).

4 Likes

Dabble in your works3 for a while, but still hard to chain multiple async op.

I also try to convert Option<T> to Result<T, ()> to steal some support from TryFutureExt. Share first:

async fn chain_more_op(input: Option<i32>) -> Option<i32> {
    use futures::{future, TryFutureExt};

    // convert `Option<T>` to `Result<T, ()>` then use `TryFutureExt` trait
    future::ready(input.ok_or(()))
        .and_then(|num: i32| async move {
            if do_some_async(num).await {
                Ok(42)
            } else {
                Err(())
            }
        })
        // try to chain other op
        .and_then(|num: i32| async move {
            if do_some_async(num).await {
                Ok(num + 5)
            } else {
                Err(())
            }
        })
        .await
        .ok() // convert Result back to Option
}

How detail description for the state machine! Thank you. I must have a try.

There are lot of things I need to figure out, but that's perfect ok and exciting. I'm trying to re-visit the doc of async part (Pin, poll(), pin_project, Context, etc.). Haven't read that part seriously before just because don't known what it related with the code I'm writing. But now I got it XD, and it is a perfect chance to dig into.

It may take a while so thank you first.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.