Select_all for async function with async-std task in tokio runtime

Hi,

I'm trying to run a function (that can fail) repeatedly, and asynchronously, until one of the invocations of the functions returns successfully, with some time delay in between.

My current attempt is to have an array of futures, and launch multiple instances of the function, until 10 futures are in progress. Then I select_all on the futures, to wait until one of the futures has finished.

Unfortunately, I get the following error when compiling my program.
I don't quite understand where to go from here, since I don't know what I could do to fix the error message, or why it requires Unpin from GenFuture.

$ cargo run
   Compiling select_all_example v0.1.0 (/home/ambiso/Downloads/select_all_example)
error[E0277]: the trait bound `std::future::GenFuture<[static generator@src/main.rs:6:44: 9:2 {u64, std::time::Duration, impl core::future::future::Future, impl core::future::future::Future, ()}]>: std::marker::Unpin` is not satisfied in `impl core::future::future::Future`
  --> src/main.rs:21:17
   |
21 |         let x = select_all(futures).await;
   |                 ^^^^^^^^^^ within `impl core::future::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::GenFuture<[static generator@src/main.rs:6:44: 9:2 {u64, std::time::Duration, impl core::future::future::Future, impl core::future::future::Future, ()}]>`
   |
  ::: /home/ambiso/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.1/src/future/select_all.rs:35:29
   |
35 |           I::Item: Future + Unpin,
   |                             ----- required by this bound in `futures_util::future::select_all::select_all`
   |
   = help: the following implementations were found:
             <std::future::GenFuture<T> as std::marker::Unpin>
   = note: required because it appears within the type `impl core::future::future::Future`
   = note: required because it appears within the type `impl core::future::future::Future`

error[E0277]: the trait bound `std::future::GenFuture<[static generator@src/main.rs:6:44: 9:2 {u64, std::time::Duration, impl core::future::future::Future, impl core::future::future::Future, ()}]>: std::marker::Unpin` is not satisfied in `impl core::future::future::Future`
   --> src/main.rs:21:17
    |
21  |         let x = select_all(futures).await;
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^^ within `impl core::future::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::GenFuture<[static generator@src/main.rs:6:44: 9:2 {u64, std::time::Duration, impl core::future::future::Future, impl core::future::future::Future, ()}]>`
    |
    = help: the following implementations were found:
              <std::future::GenFuture<T> as std::marker::Unpin>
    = note: required because it appears within the type `impl core::future::future::Future`
    = note: required because it appears within the type `impl core::future::future::Future`
    = note: required because of the requirements on the impl of `core::future::future::Future` for `futures_util::future::select_all::SelectAll<impl core::future::future::Future>`

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0277`.
error: could not compile `select_all_example`.

To learn more, run the command again with --verbose.

Here's my main.rs:

use tokio;
use async_std::task;
use futures::future::select_all;
use std::time::Duration;

async fn some_function() -> Result<(), ()> {
    task::sleep(Duration::from_millis(50)).await;
    Ok(()) // sometimes succeeds, sometimes doesn't - here it succeeds every time
}

#[tokio::main]
async fn main()  {
    let mut futures = Vec::new();

    // fill the futures vec with 10 futures
    loop {
        futures.push(some_function());
        // with some delay in between
        task::sleep(Duration::from_millis(50)).await;
        // handle result as soon as one finishes
        let x = select_all(futures).await;
        // TODO: check if `x` succeeded
        // if success: abort all other futures and return the result
        // if fail: remove the future from the futures, and launch the function again until again 10 functions are running
    }
}

In the best case I'd also like to include the sleep for 50ms in the select_all, so I don't wait longer than needed.

Here's also my Cargo.toml:

[package]
name = "select_all_example"
version = "0.1.0"
authors = ["ambiso <ambiso@invalid>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "0.2.0-alpha.6", features = ["macros"] }
async-std = "1.4"
futures = "0.3"

Thanks in advance for any help!

Best,
ambiso

I found a related topic here.

I solved it!

Apparently I needed to box and pin the futures:

        futures.push(some_function().boxed());

For this I also needed the FutureExt trait:

use futures::future::FutureExt;

Then I obtain the result, the index of the completed future in the futures array and the futures remaining incomplete:

        let (res, idx, remaining_futures) = select_all(futures).await;
        futures = remaining_futures;

Here's the full code:

use tokio;
use async_std::task;
use futures::future::select_all;
use futures::future::FutureExt;
use std::time::Duration;

async fn some_function() -> Result<(), ()> {
    task::sleep(Duration::from_millis(50)).await;
    Ok(()) // sometimes succeeds, sometimes doesn't - here it succeeds every time
}

#[tokio::main]
async fn main() {
    let mut futures = Vec::new();

    // fill the futures vec with 10 futures
    loop {
        futures.push(some_function().boxed());
        // with some delay in between
        task::sleep(Duration::from_millis(50)).await;
        // handle result as soon as one finishes
        let (res, idx, remaining_futures) = select_all(futures).await;
        futures = remaining_futures;
        // TODO: check if `res` succeeded
        // if success: abort all other futures and return the result
        // if fail: remove the future from the futures, and launch the function again until again 10 functions are running
    }
}

Note that this discussion has continued here.