How to poll mulitple futures in fn poll()?

I have a future and it needs to poll two different futures in its fn poll, I couldn't work out why it the futures are executed in a strange way

impl<F, E> Future for ResponseFuture<F>
where
    F: Future<Output = Result<String, E>>,
{
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();

        // First future
        // Get back the HTTP response
        println!("polling, before 1st future");
        let res = ready!(this.inner.poll(cx)?);

        // Second future
        // save the url returned by the `res`
        println!("polling, before 2nd future");
        let url: DbUrl = ready!(std::pin::pin!(save("a url from the resp")).poll(cx)).unwrap();

        println!("all done");
        Poll::Ready(Ok(res))
    }
}

// source: https://docs.rs/sea-orm/2.0.0-rc.27/sea_orm/entity/trait.ActiveModelTrait.html#method.save
fn save(_url: &str) -> Pin<Box<dyn Future<Output = Result<DbUrl, DbErr>> + Send>>
where
{
    Box::pin(async { Ok(DbUrl) })
}

I tried to reduce it to a playground example but I just couldn't reproduce the problem, on the playground example, it runs without problem

polling, before 1st future
polling, before 2nd future
all done

However, with my actual code, the behaviour is strange

// possible outcome 1
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future  << stuck here

// possible outcome 2
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future

thread 'tokio-runtime-worker' (265203) panicked at /home/chung/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/hyper-util-0.1.19/src/client/legacy/client.rs:246:57:
`async fn` resumed after completion
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrac


// possible outcome 3
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 1st future
polling, before 2nd future  <<< stuck

My actual code

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let this = self.project();

    println!("polling, before 1st future");
    let res = ready!(this.inner.poll(cx)?);

    let cur_url = url::ActiveModel {
        ..Default::default()
    };

    println!("polling, before 2nd future");
    let cur_url = ready!(pin!(cur_url.save(this.db)).poll(cx));

    println!("all done");

    Poll::Ready(Ok(res))
}

This is creating a new future every time your outer future is polled.

You'll want to store the Future returned by save and poll that instance instead.

Once this completes you poll the other future, but if that doesn't complete immediate you return and at the next poll this is called again. Polling an already complete future is unspecified behaviour, which includes working as expected, panicking, starting again, etc etc

3 Likes

You have to keep in mind that poll() may need to be called many many times for a single thing to finish. It's not a one-time call, but a Groundhog day movie.

ready! does NOT wait for it to finish. This is not .await. It immediately gives up if the result wasn't immediately ready.

here is why people don't write futures by hand

pin_project! {
    struct ResponseFuture<F : Future> {
        #[pin]
        inner: RespState<F>,
        url: String,
        db: DbConn,
    }
}

pin_project! {
    #[project = StateProj]
    enum RespState<F : Future> {
        First { #[pin] f :  F },
        SecondOrThird { s_o_t : RespEndState<F> }
    }
}

enum RespEndState<F : Future> {
    Second (Pin<Box<dyn Future<Output = Result<DbUrl, DbErr>> + Send>>, <F as Future>::Output),
    Done
}



impl<F, E> Future for ResponseFuture<F>
where
    F: Future<Output = Result<String, E>>,
{
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();
        let states = this.inner.as_mut().project();
        if let StateProj::First { f } = this.inner.as_mut().project() {
            println!("polling, before 1st future");
            let res = ready!(f.poll(cx)?);
            this.inner.set(RespState::SecondOrThird { s_o_t : RespEndState::Second(save("a url from the resp"), Ok(res))});
        }
        if let StateProj::SecondOrThird { s_o_t  } = this.inner.as_mut().project() {
            if let RespEndState::Second(f,_) = s_o_t {
                println!("polling, before 2nd future");
                let _url: DbUrl = ready!(f.as_mut().poll(cx)).unwrap();
                let RespEndState::Second(_,o) = core::mem::replace(s_o_t, RespEndState::Done) else {
                    unreachable!()
                };
                println!("all done");
                return Poll::Ready(o)
                
            }
        }
        panic!("future resumed after completion")
    }
}

this would be slightly more beareable by using pin_project which has a nice feature project_replace, making recuperating the output less painful.
the 2 enums instead of one are only because of the need for project_replace