Usage of futures::future::select_ok

Hi everyone,

I encountered a problem while using "select_ok" functions, the following is the sample code:

use futures::future::select_ok;
use std::io;
use std::thread::sleep;
use std::time::Duration;
use tokio;

#[tokio::main]
async fn main() {
    let mut handles = Vec::new();
    for _ in 0..5 {
        handles.push(run());
    }

    let ret = select_ok(handles.iter());
}

async fn run() -> io::Result<()> {
    sleep(Duration::from_secs(1));
    Ok(())
}

The error message is:

error[E0277]: the trait bound `&impl core::future::future::Future: core::future::future::Future` is not satisfied
  --> src/main.rs:14:15
   |
14 |     let ret = select_ok(handles.iter());
   |               ^^^^^^^^^ the trait `core::future::future::Future` is not implemented for `&impl core::future::future::Future`
   | 
  ::: /Users/user/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.5/src/future/select_ok.rs:33:20
   |
33 |           I::Item: TryFuture + Unpin,
   |                    --------- required by this bound in `futures_util::future::select_ok::select_ok`
   |
   = note: required because of the requirements on the impl of `futures_core::future::TryFuture` for `&impl core::future::future::Future`

Can anyone help me on this?

Thank you.

I suspect line 14 should be

let ret = select_ok(handles.into_iter());

but I still get the error about Unpin not being implemented for std::future::from_generator::GenFuture<...>.

Yes, I tried your suggestion, the error message is:

rror[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:17:34: 20:2 {}]>` cannot be unpinned
  --> src/main.rs:14:15
   |
14 |     let ret = select_ok(handles.into_iter());
   |               ^^^^^^^^^ within `impl core::future::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:17:34: 20:2 {}]>`
...
17 | async fn run() -> io::Result<()> {
   |                   -------------- within this `impl core::future::future::Future`
   | 
  ::: /Users/user/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.5/src/future/select_ok.rs:33:32
   |
33 |           I::Item: TryFuture + Unpin,
   |                                ----- required by this bound in `futures_util::future::select_ok::select_ok`
   |
   = note: required because it appears within the type `impl core::future::future::Future`
   = note: required because it appears within the type `impl core::future::future::Future`

I really believe that into_iter should used instead of iter. You can confirm that

for h in handles() { println!("{:?}", h.await) }

and

for h in handles.into_iter() { println!("{:?}", h.await) }

both work, while

for h in handles.iter() { println!("{:?}", h.await) }

does not. The handles (which are impl Future) must be passed by value, not references to them.

Unfortunately, I don't know how to make the async fn run() -> io::Result<()> function return a type which is also Unpin.

I believe the problem lies here:

fn is_unpin<U: Unpin>(u: U) {}
fn test() {
    is_unpin(run()) // does not compile because of Unpin
}

I have never dived into async/await/Pin/Unpin... :grimacing:

Hi @FedericoStra

Thanks for you reply, yes you are right about the incorrect usage of "iter()", "into_iter()" is what we want.

And expecting to solve the follow up error: return Unpin future of async fn run() -> io::Result<()>

@FedericoStra I google again, and found a solution from this link https://stackoverflow.com/questions/58234162/rust-async-await-check-if-any-future-in-a-list-resolves-to-true-concurrently

We should boxed() the Future, and then it works.

use futures::future::FutureExt;

#[tokio::main]
async fn main() {
    let mut handles = Vec::new();
    for _ in 0..5 {
        handles.push(run().boxed());
    }

    let ret = select_ok(handles.into_iter());
}

async fn run() -> io::Result<()> {
    sleep(Duration::from_secs(1));
    Ok(())
}

Thanks for your great help!

1 Like

Oh... Ok... Well done!
In the link you posted there is in particular this comment by Tim Visée:

Thanks a lot for this! I'd like to explicitly note that boxed() is used to Pin the value, as required by select_ok .

It all seems still a bit confusing to me right now. I guess I have found something new to study :slight_smile:

Yeah, me too. Learned a lot :stuck_out_tongue:

Note that you are not allowed to use methods such as std::thread::sleep in async code. You should use Tokio's delay_for function instead.

@alice Yes, thanks for reminding me, we should use delay_for in async functions. I just use sleep as an example.

As for the Unpin thing, it is because the select_ok combinator returns back all of the futures that are still pending, which involves a move. If you just want to cancel the remaining futures, you can use the following:

use futures::stream::{FuturesUnordered, StreamExt};
use std::future::Future;

async fn select_ok<F, A, B>(
    futs: impl IntoIterator<Item = F>
) -> Result<A, B>
where
    F: Future<Output = Result<A, B>>,
{
    let mut futs: FuturesUnordered<F> = futs.into_iter().collect();

    let mut last_error: Option<B> = None;
    while let Some(next) = futs.next().await {
        match next {
            Ok(ok) => return Ok(ok),
            Err(err) => {
                last_error = Some(err);
            }
        }
    }
    Err(last_error.expect("Empty iterator."))
}

This does not require the future to be boxed.

So... If I understand correctly, run().boxed() calls futures::future::FutureExt::boxed, which calls Box::pin, which calls <Pin<Box<T>> as From<Box<T>>)::from, which calls Box::into_pin, which ultimately calls unsafe Pin::new_unchecked...

At the end of the day, we should end up with a Pin<Box<our original future>>. Now, what I don't understand is how can we obtain something which is Unpin, since the implementation impl<P> Unpin for Pin<P> requires P: Unpin.

Edit: I just realized that this P is not the original future, which is not Unpin, but Box<original future>, which is Unpin.

When using .boxed(), it will explicitly cast to a trait object, so it is a Pin<Box<dyn Future>>, and not a Pin<box<our original future>>. Additionally, P = Box<dyn Future>, and boxes are always unpin regardless of the contents.

1 Like

Awesome! Seems like we could write our own select_ok, new skill is learned!

Uhm... I agree about everything (as I edited my comment), but I still don't see the cast to a trait object. Where does it happen?

It happens because .boxed() returns a BoxFuture which is a type alias for a trait object in a pin-box. Conversion to trait object happens automatically if the types require it.

1 Like

Ok, I see the definition type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;.
So you are saying that if Type: Trait then a value of type Pin<Box<Type>> can be automatically cast to a Pin<Box<dyn Trait>>?

Edit: Uh! It works! :smile:

Yes. This is possible because both Box and Pin implement CoerceUnsized.

1 Like

Thank you very much Alice, I'm learning a lot about portions of the language I haven't explored before.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.