Wait for futures in loop?

I want to write something like this:

while has_data_in_stream && !time_to_stop {
   do_something_with_data_in_stream()
}
use tokio::{
    stream::{self, StreamExt},
    sync::oneshot,
};

#[tokio::main]
async fn main() {
    let mut stream1 = stream::iter(vec![1, 2, 3]);

    let (stop_read, time_to_stop): (oneshot::Sender<()>, _) = oneshot::channel();
    loop {
        let next = tokio::select! {
            v = stream1.next() => v.unwrap(),
            _ = time_to_stop => break,
            else => break,
        };

        println!("next: {}", next);
    }
}

but as result I got error about:

19 |     let (stop_read, time_to_stop): (oneshot::Sender<()>, _) = oneshot::channel();
   |                     ------------ move occurs because `time_to_stop` has type `tokio::sync::oneshot::Receiver<()>`, which does not implement the `Copy` trait
...
23 |             _ = time_to_stop => break,
   |                 ^^^^^^^^^^^^ value moved here, in previous iteration of loop

How can I fix this without removing loop ?

I know I can fix it via select!(stream.for_each, time_to_stop) but this has another problem,
so I want to use loop, is it possible?

Futures aren't copyable, and you're passing your one and only copy of time_to_stop to be "eaten" by select!.

Delay can be awaited by reference, which doesn't destroy it in the process:

#[tokio::main]
async fn main() {
    let mut delay = time::delay_for(Duration::from_millis(50));

    loop {
        tokio::select! {
            _ = &mut delay => {
                println!("operation timed out");
                break;
            }
            _ = some_async_work() => {
                println!("operation completed");
            }
        }
    }
}
1 Like

this is what I looking for:

use std::time::Duration;
use tokio::{
    stream::{self, StreamExt},
    sync::oneshot,
    time,
};

#[tokio::main]
async fn main() {
    let mut stream1 = stream::iter(vec![1, 2, 3]);

    let (stop_read, mut time_to_stop): (oneshot::Sender<()>, _) = oneshot::channel();
    tokio::spawn(async move {
        time::delay_for(Duration::from_millis(100)).await;
        if let Err(_) = stop_read.send(()) {
            eprintln!("somthing goes wrong");
        }
    });
    loop {
        let next = tokio::select! {
            v = stream1.next() => {
                time::delay_for(Duration::from_millis(50)).await;
                v.unwrap()
            }
            _ = &mut time_to_stop => {
                println!("time_to_stop trigger");
                break;
            }
            else => break,
        };

        println!("next: {}", next);
    }
}

@kornel, thanks a lot!