Await future in a loop

I am trying to write a loop that stops when receiving some stop signal, otherwise, it await on an input future continuously, like this:

async fn loop_forever<T, F>(mut stop_signal: Receiver<()>, clean_up:T, job: F)
where T: Future<Output=()> , F: Future<Output=()>
{
    loop{
        match stop_signal.try_recv() {
            Ok(_) => {
                clean_up.await;
                break;
            }
            Err(Empty) => (),
            Err(_) => panic!("Unexpected receiver state"),
        }
       job.await;
    }
}

Then, compiler says job is used after remove, then I've tried this:

async fn loop_forever<T, F>(mut stop_signal: Receiver<()>, clean_up:T, job: F)
where T: Future<Output=()> , F: Future<Output=()>
{
    loop{
        match stop_signal.try_recv() {
            Ok(_) => {
                clean_up.await;
                break;
            }
            Err(Empty) => (),
            Err(_) => panic!("Unexpected receiver state"),
        }
       (&mut job).await;
    }
}

Then compiler says type F does not implement Unpin.

Why did this happen?

How to do this correctly?

Normally a Future represents a single event/value. That's why .await consume the value.

What do you want to this job represents? If it represents a sequence of events, use Stream. Or, if it represents some async task, why don't you take a closure which returns impl Future<Output=()> and call it repeatedily?

1 Like

Job doesn't represent a sequence of events. I actually want type that stores some states and can be polled many times until stop signal is received .

.await itself .poll()s underlying future repeatedly until it returns Poll::Ready(T), pausing itself between each events. If you want to check some condition before each .poll(), you should implement trait Future yourself.

But in this case, assuming the stop_signal is futures::channel::oneshot::Receiver<()>, this should be enough:

async fn loop_forever<T, F(stop_signal: Receiver<()>, cleanup: T, job: F)
where T: Future<Output<()>, F: Future<Output=()>
{
    futures::future::select(stop_signal.then(|()| cleanup), job).await;
}
2 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.