How to handle a vector of async function pointers

Hello all, I'm new to Rust and coming from typescript/haskell. I'm attempting to implement something along the lines of:

use std::future::Future;

async fn add_one(i: i32) -> Option<i32> {
    Some(i + 1)
}

async fn to_none(i: i32) -> Option<i32> {
    None
}

#[tokio::main]
async fn main() {
    let mut futures: Vec<fn(i32) -> dyn Future<Output = Option<i32>>> = vec![];

    futures.push(add_one); // Error
    futures.push(to_none); // Error
                           // mismatched types
                           // expected fn pointer `fn(_) -> (dyn std::future::Future<Output = std::option::Option<i32>> + 'static)`
                           //       found fn item `fn(_) -> impl std::future::Future {add_one}`rustc(E0308)

    for fut in &futures {
        match fut(1).await {
            // Error
            // the size for values of type `dyn std::future::Future<Output = std::option::Option<i32>>` cannot be known at compilation time
            // the trait `std::marker::Sized` is not implemented for `dyn std::future::Future<Output = std::option::Option<i32>>`
            // to learn more, visit <https://doc.rust-lang.org/book/ch19-04-advanced-types.html#dynamically-sized-types-and-the-sized-trait>
            // all local variables must have a statically known size
            // unsized locals are gated as an unstable featurerustc(E0277)
            Some(num) => println!("Future returned {:?}.", num),
            None => println!("Didn't get a number."),
        }
    }
}

As far as I can tell the first error has to do with the types defined for the futures vector. I would like to leave the add_one and to_none functions alone and not muck with their types, so changing the type for the futures vector seems appropriate, but I'm not sure how to do that either!

The second error is getting into correct usage of Sized. Unfortunately, the std docs for Sized are a little inscrutable to me. Maybe I should add + ?Sized somewhere, but I don't understand where that would be or why, which is not how I prefer to learn things. Any advice would be appreciated! Thanks.

Anyway, hope you are all enjoying your social distance and keeping safe, fed, and healthy!

1 Like

The issue is that add_one and to_none do not return dyn Future. They return some specific type that implements the Future trait, and not a trait object. Additionally the two functions return different concrete implementations of the Future trait, so they have different signatures, so you can't put them in a vector that requires them to have the same type.

One option is to box the futures, which unifies them into a single type:

async fn add_one(i: i32) -> Option<i32> {
    Some(i + 1)
}
fn add_one_async(i: i32) -> Pin<Box<dyn Future<Output = Option<i32>>>> {
    Box::pin(add_one(i))
}

async fn to_none(_i: i32) -> Option<i32> {
    None
}
fn to_none_async(i: i32) -> Pin<Box<dyn Future<Output = Option<i32>>>> {
    Box::pin(to_none(i))
}

#[tokio::main]
async fn main() {
    let mut futures: Vec<fn(i32) -> Pin<Box<dyn Future<Output = Option<i32>>>>> = vec![];

    futures.push(add_one_async);
    futures.push(to_none_async);

    for fut in &futures {
        match fut(1).await {
            Some(num) => println!("Future returned {:?}.", num),
            None => println!("Didn't get a number."),
        }
    }
}

playground

6 Likes

Thanks alice!

I tried your changes and compilation works, but the behavior is not what I expected. Clearly my mental model of async is wrong in spite of my research. I've added a little more code here:

use std::{future::Future, pin::Pin, thread::sleep, time::Duration};

async fn add_one(i: usize) -> Option<usize> {
    sleep(Duration::from_secs(i as u64));
    Some(i + 1)
}
fn add_one_async(i: usize) -> Pin<Box<dyn Future<Output = Option<usize>>>> {
    Box::pin(add_one(i))
}

async fn to_none(_i: usize) -> Option<usize> {
    None
}
fn to_none_async(i: usize) -> Pin<Box<dyn Future<Output = Option<usize>>>> {
    Box::pin(to_none(i))
}

#[tokio::main(threaded_scheduler)]
async fn main() {
    let mut futures: Vec<fn(usize) -> Pin<Box<dyn Future<Output = Option<usize>>>>> = vec![];

    futures.push(add_one_async);
    futures.push(to_none_async);

    println!("Before loop");

    for fut in &futures {
        match fut(1).await {
            Some(num) => println!("Future returned {:?}.", num),
            None => println!("Didn't get a number."),
        }
    }

    println!("After loop")
}

The actual output is:

Before loop
Future returned 2.
Didn't get a number.
After loop

But I expected:

Before loop
After loop
Didn't get a number.
Future returned 2.

Thinking a little more, it's clear that await pauses local execution, which makes sense. I had been thinking of Futures more like Promises, which is perhaps incorrect. This leads me to the question of how I might achieve the execution of multiple async functions in parallel and handle their responses in whatever order they return?

Here's a pretty good mental model of how futures work in Rust:

Imagine you had a list of one thousand elements called futures. Each future has a function called poll, which does a little bit of work and then returns quickly. By looping through the list and calling poll on each future, you can do work on all one thousand tasks at once in a single thread.

However, note that this requires the futures to cooperate. If a future spends a long time inside poll, then your thread doesn't have time to go work on the other tasks. In your case you are using thread::sleep, which has exactly this behaviour — it makes poll take a long time. You can see this because it's an operation that takes time without an .await.

The Tokio library provides two things: The loop that polls all the futures you spawn on it, and various replacements for sleeping and IO that cooperate by not spending a lot of time inside poll. For example, you should use delay_for for sleeping inside async functions.

That said, you problem is still not solved, because as you observed, .await will wait for the thing you await. I did have to mention it though, because you were using the thread::sleep function. To spawn a task that runs independently, you can use the tokio::spawn function, which returns a JoinHandle you can await later. This will make the task you spawned start running immediately, and allow you to wait for it later (e.g. you can put them in a vector and loop through them later).

There are also other tools for running things concurrently such as:

  1. the join! macro that waits for a fixed number of tasks concurrently,
  2. the select! macro that waits for a fixed number tasks, but exits when one finishes
  3. the join_all function, which is like join! but for vectors of tasks
  4. the FuturesUnordered collection, which allows you to wait for the next task and add tasks as you go

Note that when you exit main, all other tasks are immediately cancelled.

7 Likes

To make an analogy to Javascript, every .await is like calling then on a JS Promise with the rest of the function as callback. Using delay_for is like using setTimeout to sleep whereas thread::sleep is a while loop that runs for a full second, freezing up the browser window in the mean time.

Keep in mind that Futures do nothing until you use .await on them. Call to an async function does not run any code. It only initializes an object to be polled.

This is very different from .then in JS that just queues another callback to already-running Promise.

for fut in futures { fut.await } runs one thing at a time, with no concurrency. You should use join_all, FuturesUnordered to run multiple futures at once.

If you really need to run only one future at a time, consider using a Stream. Buffered stream can run futures with a specific concurrency.

1 Like

@alice Thanks for the very useful response. I have a few other questions that have been nagging on me. All of the popular documentation around rust and async/await references one or maybe two of:

Perhaps you can confirm my understanding and clarify some potential misunderstandings?

First, historically, it seems like Tokio existed as a framework for asynchronous programming prior to the finalization of the async/await syntax addition to Rust. The futures crate seems to be the blessed crate used for bike-shedding the implementation of the Future trait. Tokio seemed to have its own implementation of Future that was not directly compatible with the futures trait (?).

In the nomenclature of the futures crate, tokio and async-std seem to implement different Executors -- executors being responsible for managing resources for spawned futures (threading or not, green threads or not, etc). Additionally, tokio and async-std both implement asynchronous functions similar to the synchronous ones provided in std. If you are familiar with javascript they are a bit like the promise implementation of bluebirdjs vs awaityjs vs native es promises (plus some extraneous utilities thrown in).

When it was apparent that the Future trait from futures was going to be moved into std::future, this was the trait adopted and implemented by both async-std and tokio (and any other executor implementations I don't know about). Thus we have the current state of things.

Some questions!

  1. It looks like the futures crate has quite a few options for executors on its own, is it generally possible to just use those executors instead of tokio or async-std's?
  2. Are the utility functions for net, io, etc in async-std and tokio compatible with each others executors? ie. Is it possible to execute async-std's read_dir with tokio's spawn? It's already apparent that one cannot use delay_for without setting up the tokio runtime with #[tokio::main] attribute. Is there some way to tell when utilities are executor agnostic?
  3. Leading from question 2, is there a collection of executor agnostic utilities? Is this even possible?

@kornel Thanks for the clarification. I had figured that Rust futures work more like single valued Observables than Promises but it's good to have confirmation.

Just to put a more useful (for me) example up in case anyone stumbles on this thread. Here is another take on the original code-paste:

use futures::{executor::block_on, future::join_all};
use std::time::Duration;
use tokio::time::delay_for;

async fn foo(i: u64) -> Option<u64> {
    if i <= 3 {
        println!("Delay for {:?}s", i);
        delay_for(Duration::from_secs(i)).await;
    }
    println!("foo({:?})", i);
    match i {
        x if x % 2 == 0 => Some(x),
        _ => None,
    }
}

#[tokio::main]
async fn main() {
    let futures: Vec<_> = vec![foo, foo, foo, foo]
        .iter()
        .enumerate()
        .map(|(index, func)| func(index as u64))
        .collect();

    block_on(async {
        let results = join_all(futures).await;
        println!("Got Results {:?}", results);
    });
}

With Results:

Delay for 0s
Delay for 1s
Delay for 2s
Delay for 3s
foo(0)
foo(1)
foo(2)
foo(3)
Got Results [Some(0), None, Some(2), None]

To use more than one function in the futures vector, one would need to box the results like alice suggested.

I think that to answer your questions, I will first introduce another piece of the puzzle: Notifications. When a future is waiting for something (a timer, a network request), it doesn't make sense to poll the future during this operation because that future currently has nothing more to do. To fix this issue, the poll function introduces a context parameter, which contains a Waker. The future is supposed to store this Waker somewhere, and when the future is ready to continue, the wake function on this waker should be called.

Now the question is, if you are implementing delay_for, how do you trigger a wake-up when the timer elapses? You need some sort of code running externally to the future itself to do this. In Tokio, this external code is part of the executor, and the executor ensures that Tokio's network IO and timer utilities are woken up when the operations are ready to continue. It's the same deal in async-std: The executor sends out notifications on behalf of the futures. The executor module in the futures crate provides a very simple single-threaded executor without any timer or network-IO utilities.

Anyway, this answers why some of the utility functions provided by the executors are not executor agnostic: They need external code to handle wake-up notifications. And of course, if you wanted to make an executor agnostic timer, you could just spawn a thread, do a thread::sleep and emit the wake-up, but that wouldn't be as efficient as Tokio's shared timer utilities.

That said, some utilities do not need external code: For example a channel can coordinate the two halves and make one half wake the other half up, meaning that no external code is needed to handle that situation. In fact the entire tokio::sync module is executor agnostic for this reason. Additionally the entire futures crate provides exclusively executor agnostic utilities.

As for the history behind async await: Instead of building your futures using async blocks, you can also do it by combining various methods such as map, then and other functions similar to the combinators on iterators. You can still find them here. The futures crate was where we experimented with the Future trait and figured out how it should look, and back then Tokio more or less had the same role it had now: Run the futures and provide utilities that require help from the executor. The async-std crate was introduced around when async await was stabilized.

You may have noticed that I always specifically talked about network IO as opposed to file IO. This is because your OS usually provides either no or very poor APIs for interacting with the file system asynchronously. To handle this, Tokio introduces a spawn_blocking function that can run blocking code on a separate thread, and notify the asynchronous code once this blocking operation finishes. This works by the Tokio thread pool actually having two kinds of threads: Core threads and blocking threads. The core threads are limited by the number of cpus and run the futures you spawn, whereas there can be up to 512 blocking threads at any one time, and these blocking threads are tasked with running blocking pieces of code such as file IO. The reason there are so many blocking threads is that, unlike futures, every single blocking operation monopolizes a full thread.

Various smaller comments:

Note that you can also use Tokio by building a Runtime object and interacting with that.

Don't do this! You're running blocking code in an async function, and monopolizing an entire thread in the core Tokio threadpool for your operation. Just do this:

#[tokio::main]
async fn main() {
    let futures: Vec<_> = vec![foo, foo, foo, foo]
        .iter()
        .enumerate()
        .map(|(index, func)| func(index as u64))
        .collect();

    let results = join_all(futures).await; 
    println!("Got Results {:?}", results);
}

You shouldn't run an executor inside an executor.

4 Likes

Yes, there are quite a few executors in rust. Tokio and async-std being the popular ones among them, the reason why there is no single executor in Rust is that each one of them have their own pros and cons.

So, it is up to the developer to choose which executor works well for them rather than forcing one single executor on the developer.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.