FuturesUnordered

Here's a more or less direct translation to using Tokio and delay_for.

use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{Duration, delay_for};
use tokio::runtime::Runtime;

use rand::distributions::{Distribution, Uniform};

async fn random_sleep(num: i64) -> i64 {
    let mut rng = rand::thread_rng();
    let secs: u64 = Uniform::from(2..10).sample(&mut rng);
    let d = Duration::from_secs(secs);
    println!("--> [{}] sleep {} seconds", num, secs);
    delay_for(d).await;
    println!("<-- [{}] slept {} seconds", num, secs);
    num
}

async fn run() {
    let mut cnt = 0;
    let mut workers = FuturesUnordered::new();

    while workers.len() < 5 {
        workers.push(random_sleep(cnt));
        cnt += 1;
    }

    loop {
        match workers.next().await {
            Some(result) => {
                println!("    finished future [{}]", result);
                if cnt < 20 {
                    workers.push(random_sleep(cnt));
                    cnt += 1;
                }
            }
            None => {
                println!("Done!");
                break;
            }
        }
    }
}

// Could also use #[tokio::main]
fn main() {
    let mut runtime = Runtime::new().unwrap();
    runtime.block_on(run());
}

You can't easily call poll_next. If you look at the definition, you can see that you need to both pin the stream and provide a context. This is the job of Tokio, and you mostly shouldn't have to care about this, and this is also why next() exists. The context is how the stream sets up notifications for the executor when the stream is ready with the next item.

Not at all! Async await provides alternatives to all of your blocking methods, that you need to use instead. For example, Tokio has everything from tcp streams, timers, file operations that are implemented in a way such that they cooperate with the executor by yielding back control while they wait for IO or timers. This means that you can run thousands of things concurrently on a single thread, but only if the tasks cooperate by not spending a lot of time inside poll.

Tokio does provide an escape hatch for when you really have to run blocking code: spawn_blocking, however this works internally by running it on a large thread pool with up to (by defalut) 500 threads, and your blocking task will be monopolizing an entire thread for the duration of its running time.

Async await is not magic. The translation that the compiler performs on your async function is very impressive, but there's nothing it can do if your futures don't cooperate with the executor.

8 Likes