FuturesUnordered

Hi,

I am fiddling with futures and trying to get a pool of constant size of futures that are doing work. The idea is to keep the size of a FuturesUnordered list constant and spwan new futures when one finishes.

The problem however is, that I am able to create multiple futures but I cannot execute them in parallel. I read in the docs, that I have to call poll_next after inserting a new future to the list, but the method is not known?

Maybe my idea is wrong and there is a better method to achive this with rust.

Here is a minimal example, which is executed sequential:

use std::thread::{sleep};
use std::time::Duration;
use futures::prelude::*;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::executor::block_on;

use rand::distributions::{Distribution, Uniform};

async fn random_sleep(num: i64) -> i64 {
  let mut rng = rand::thread_rng();
  let secs: u64 = Uniform::from(2..10).sample(&mut rng);
  let d = Duration::from_secs(secs);
  println!("--> [{}] sleep {} seconds", num, secs);
  sleep(d);
  println!("<-- [{}] slept {} seconds", num, secs);
  num
}

async fn run () {
  let mut cnt = 0;
  let mut workers = FuturesUnordered::new();

  while workers.len() < 5 {
    workers.push( random_sleep(cnt) );
    cnt += 1;
  }

  loop {
    match workers.next().await {
      Some(result) => {
        println!("    finished future [{}]", result);
        if cnt < 20 {
          workers.push( random_sleep(cnt) );
        }
      },
      None => {
        println!("Done!");
        break;
      }
    }
  }
}

fn main() {
  block_on(run());
}

As I am trying to learn rust more deeply I would like to depend on standard features and not third-party libraries like tokio.

The poll_next function is found on the Stream trait, so you have to import that trait to use it. However the next() function you found on StreamExt internally calls poll_next, so that is fine.

However the reason your code is not working as you expect is because you are blocking inside an async function. The sleep function does not yield control back to the executor while it's sleeping (you can see this because of the lack of .await). You want to use Tokio's delay_for instead, however this requires using the Tokio runtime, so you will need to use that instead of block_on from the futures crate.

To get some understanding of what's going on, the way a future works is that the compiler translates your async fn into an object of type Future. This object has a function called poll, that is supposed to do a little bit of work, and then return. Additionally it should set up some sort of notification when it is ready to continue work (e.g. after the sleep). FuturesUnordered can run many things simultaneously by calling poll on all the futures, and since they immediately return, it can poll all of them.

However when you use a blocking sleep call that doesn't yield control back using .await, you have a call to poll that takes a long time, so FuturesUnordered is not able to go call poll on the other futures while your future is sleeping inside the call to poll.

You say you want to depend on standard features and not Tokio, but the simple answer is that then you can't use async await. The standard library only has the exact things needed to make async functions a supported language feature. It doesn't have anything that lets you run a future, or any other tools for using them. Note that one thing Tokio provides is a timer module that let's the poll function register a wake-up in the future for when the sleep timer runs out, but if you are not using the Tokio executor, this timer is not available, and the sleep call will fail, as it is not able to register it's own wake-up.

3 Likes

Hm, i thought this is what async is for? I mean wrapping blocking code into non-blocking functions and returning, when these finished.

Even when I extend the use clause like this:

use futures::stream::{FuturesUnordered, StreamExt, Stream};

I am not able to call workers.poll_next(): method not found in 'futures_util::stream::futures_unordered::FuturesUnordered<impl core::future::future::Future>'

I was thinking that the problem is that workers.next().await is blocking the execution and I should call poll_next() and test for Poll::Ready(Some(result)).

So you are telling me, that I have to use tokio in order to solve this? Could you provide a scetch or minimal example how to do it with tokio?

Here's a more or less direct translation to using Tokio and delay_for.

use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{Duration, delay_for};
use tokio::runtime::Runtime;

use rand::distributions::{Distribution, Uniform};

async fn random_sleep(num: i64) -> i64 {
    let mut rng = rand::thread_rng();
    let secs: u64 = Uniform::from(2..10).sample(&mut rng);
    let d = Duration::from_secs(secs);
    println!("--> [{}] sleep {} seconds", num, secs);
    delay_for(d).await;
    println!("<-- [{}] slept {} seconds", num, secs);
    num
}

async fn run() {
    let mut cnt = 0;
    let mut workers = FuturesUnordered::new();

    while workers.len() < 5 {
        workers.push(random_sleep(cnt));
        cnt += 1;
    }

    loop {
        match workers.next().await {
            Some(result) => {
                println!("    finished future [{}]", result);
                if cnt < 20 {
                    workers.push(random_sleep(cnt));
                    cnt += 1;
                }
            }
            None => {
                println!("Done!");
                break;
            }
        }
    }
}

// Could also use #[tokio::main]
fn main() {
    let mut runtime = Runtime::new().unwrap();
    runtime.block_on(run());
}

You can't easily call poll_next. If you look at the definition, you can see that you need to both pin the stream and provide a context. This is the job of Tokio, and you mostly shouldn't have to care about this, and this is also why next() exists. The context is how the stream sets up notifications for the executor when the stream is ready with the next item.

Not at all! Async await provides alternatives to all of your blocking methods, that you need to use instead. For example, Tokio has everything from tcp streams, timers, file operations that are implemented in a way such that they cooperate with the executor by yielding back control while they wait for IO or timers. This means that you can run thousands of things concurrently on a single thread, but only if the tasks cooperate by not spending a lot of time inside poll.

Tokio does provide an escape hatch for when you really have to run blocking code: spawn_blocking, however this works internally by running it on a large thread pool with up to (by defalut) 500 threads, and your blocking task will be monopolizing an entire thread for the duration of its running time.

Async await is not magic. The translation that the compiler performs on your async function is very impressive, but there's nothing it can do if your futures don't cooperate with the executor.

8 Likes

Thank you, your code works like a charm.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.