How to poll mulitple futures in fn poll()?

I have a future and it needs to poll two different futures in its fn poll, I couldn't work out why it the futures are executed in a strange way

impl<F, E> Future for ResponseFuture<F>
where
    F: Future<Output = Result<String, E>>,
{
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();

        // First future
        // Get back the HTTP response
        println!("polling, before 1st future");
        let res = ready!(this.inner.poll(cx)?);

        // Second future
        // save the url returned by the `res`
        println!("polling, before 2nd future");
        let url: DbUrl = ready!(std::pin::pin!(save("a url from the resp")).poll(cx)).unwrap();

        println!("all done");
        Poll::Ready(Ok(res))
    }
}

// source: https://docs.rs/sea-orm/2.0.0-rc.27/sea_orm/entity/trait.ActiveModelTrait.html#method.save
fn save(_url: &str) -> Pin<Box<dyn Future<Output = Result<DbUrl, DbErr>> + Send>>
where
{
    Box::pin(async { Ok(DbUrl) })
}

I tried to reduce it to a playground example but I just couldn't reproduce the problem, on the playground example, it runs without problem

polling, before 1st future
polling, before 2nd future
all done

However, with my actual code, the behaviour is strange

// possible outcome 1
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future  << stuck here

// possible outcome 2
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future

thread 'tokio-runtime-worker' (265203) panicked at /home/chung/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/hyper-util-0.1.19/src/client/legacy/client.rs:246:57:
`async fn` resumed after completion
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrac


// possible outcome 3
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future  <<< stuck

My actual code

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let this = self.project();

    println!("polling, before 1st future");
    let res = ready!(this.inner.poll(cx)?);

    let cur_url = url::ActiveModel {
        ..Default::default()
    };

    println!("polling, before 2nd future");
    let cur_url = ready!(pin!(cur_url.save(this.db)).poll(cx));

    println!("all done");

    Poll::Ready(Ok(res))
}

This is creating a new future every time your outer future is polled.

You'll want to store the Future returned by save and poll that instance instead.

Once this completes you poll the other future, but if that doesn't complete immediate you return and at the next poll this is called again. Polling an already complete future is unspecified behaviour, which includes working as expected, panicking, starting again, etc etc

3 Likes

You have to keep in mind that poll() may need to be called many many times for a single thing to finish. It's not a one-time call, but a Groundhog day movie.

ready! does NOT wait for it to finish. This is not .await. It immediately gives up if the result wasn't immediately ready.

here is why people don't write futures by hand

pin_project! {
    struct ResponseFuture<F : Future> {
        #[pin]
        inner: RespState<F>,
        url: String,
        db: DbConn,
    }
}

pin_project! {
    #[project = StateProj]
    enum RespState<F : Future> {
        First { #[pin] f :  F },
        SecondOrThird { s_o_t : RespEndState<F> }
    }
}

enum RespEndState<F : Future> {
    Second (Pin<Box<dyn Future<Output = Result<DbUrl, DbErr>> + Send>>, <F as Future>::Output),
    Done
}



impl<F, E> Future for ResponseFuture<F>
where
    F: Future<Output = Result<String, E>>,
{
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();
        let states = this.inner.as_mut().project();
        if let StateProj::First { f } = this.inner.as_mut().project() {
            println!("polling, before 1st future");
            let res = ready!(f.poll(cx)?);
            this.inner.set(RespState::SecondOrThird { s_o_t : RespEndState::Second(save("a url from the resp"), Ok(res))});
        }
        if let StateProj::SecondOrThird { s_o_t  } = this.inner.as_mut().project() {
            if let RespEndState::Second(f,_) = s_o_t {
                println!("polling, before 2nd future");
                let _url: DbUrl = ready!(f.as_mut().poll(cx)).unwrap();
                let RespEndState::Second(_,o) = core::mem::replace(s_o_t, RespEndState::Done) else {
                    unreachable!()
                };
                println!("all done");
                return Poll::Ready(o)
                
            }
        }
        panic!("future resumed after completion")
    }
}

this would be slightly more beareable by using pin_project which has a nice feature project_replace, making recuperating the output less painful.
the 2 enums instead of one are only because of the need for project_replace

I have to pass DbConn from the pinned struct

pin_project! {
    struct ResponseFuture<F> {
        #[pin]
        inner: RespState<F>,
        url: String,
        db: DbConn,
    }
}

pin_project! {
    #[project = StateProj]
    enum RespState<F : Future> {
        First { #[pin] f :  F },
        SecondOrThird { s_o_t : RespEndState<F> }
    }
}

enum RespEndState<F: Future> {
    Second(
        Pin<Box<dyn Future<Output = Result<DbUrl, DbErr>> + Send>>,
        <F as Future>::Output,
    ),
    Done,
}

into the trait method save

fn save<'a, 'async_trait, C>(
    self,
    db: &'a C,
) -> Pin<Box<dyn Future<Output = Result<Self, DbErr>> + Send + 'async_trait>> {..}


if let StateProj::First { f } = this.inner.as_mut().project() {
    let res = ready!(f.poll(cx)?);

    let cur_url = url::ActiveModel {
        ..Default::default()
    };

    this.inner.set(ResponseState::SecondOrThird {
        // problem                                                              vvvvvv
        s_o_t: ResponseEndState::Second(cur_url.save(this.db), Ok(res)),
    });
}

if let StateProj::SecondOrThird { s_o_t } = this.inner.as_mut().project() {
    if let ResponseEndState::Second(f, _) = s_o_t {
        let url: url::ActiveModel = ready!(f.as_mut().poll(cx)).unwrap();

        let ResponseEndState::Second(_, res) =
            core::mem::replace(s_o_t, ResponseEndState::Done)
        else {
            unreachable!()
        };

        return Poll::Ready(res);
    }
}

error: lifetime may not live long enough
  --> src/extraction.rs:78:49
   |
57 |     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
   |                       - let's call the lifetime of this reference `'1`
...
78 |                 s_o_t: ResponseEndState::Second(cur_url.save(this.db), Ok(res)),
   |                                                 ^^^^^^^^^^^^^^^^^^^^^ this usage requires that `'1` must outlive `'static`

I don't know how to pass DbConn as a reference that satisifies save.

well what you need to do here is be self referential.
self referential futures always require very abstract and easy to get wrong use of unsafe.
if there is any way you can, i would advise you to avoid them at all cost.
my advice would be, if it is at all possible, make a

struct ResponseFuture<'a, F> {
        #[pin]
        inner: RespState<'a, F>,
        url: String,
        db: &'a DbConn,
    }
    enum RespState<'a, F : Future> {
        First { #[pin] f :  F },
        SecondOrThird { s_o_t : RespEndState<'a, F> }
    }
enum RespEndState<'a, F: Future> {
    Second(
        Pin<Box<dyn Future<Output = Result<DbUrl, DbErr>> + Send + 'a>>,
        <F as Future>::Output,
    ),
    Done,
}

this would make your future not self referential.
if that is not something you want to do, then you will have to get into very tricky stuff

if you really want to do the self referential verion, i can tell you that this one is sound, as long as you don't change one bit.
even something that looks like it has nothing to do with the unsafe code could easily make it UB.

and ofc here is the safe version with a reference, which i would strongly recommend using over the other if posible :

1 Like