How to select a Vec of Future?

Hi,

I had a futures::select! but now I’ve modified the code a bit and the first one becomes a Vec of Future (multiple address to listen to with axum, so multiple bind().serve() calls) and I’m not sure on how I’m supposed to fit that in a select!().

I’ve read about FuturesUnordered, but it doesn’t implement Future trait so I’m puzzled.

Thanks for any insight!

What you could do is collect the FuturesUnordered stream into a future you can await:

use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;

async fn foo() {}

#[tokio::main]
async fn main() {
    let futures = vec![foo(), foo(), foo()];
    
    let futures = FuturesUnordered::from_iter(futures);
    
    let _ = futures.collect::<Vec<()>>().await;
}

Playground.

If I have to join multiple futures I usually use join_all instead.

Oh that what I miss read in the doc, collect the FuturesUnordered, not the iterator we construct from ^^

Thanks a lot!

1 Like

Hm, I tried the solution, but something is bothering the compiler:

error[E0599]: the method `collect` exists for struct `futures::stream::FuturesUnordered<&impl futures::Future<Output = std::result::Result<(), std::io::Error>>>`, but its trait bounds were not satisfied
   --> src/lib.rs:146:35
    |
146 |             let futures = futures.collect::<Vec<()>>();
    |                                   ^^^^^^^ method cannot be called on `futures::stream::FuturesUnordered<&impl futures::Future<Output = std::result::Result<(), std::io::Error>>>` due to unsatisfied trait bounds
    |
   ::: /home/geobert/.cargo/registry/src/artifactory.isode.net-83fdf68b7274ef46/futures-util-0.3.25/src/stream/futures_unordered/mod.rs:55:1
    |
55  | pub struct FuturesUnordered<Fut> {
    | --------------------------------
    | |
    | doesn't satisfy `_: futures::StreamExt`
    | doesn't satisfy `_: futures::Stream`
    | doesn't satisfy `_: std::iter::Iterator`
    |
    = note: the following trait bounds were not satisfied:
            `futures::stream::FuturesUnordered<&impl futures::Future<Output = std::result::Result<(), std::io::Error>>>: futures::Stream`
            which is required by `futures::stream::FuturesUnordered<&impl futures::Future<Output = std::result::Result<(), std::io::Error>>>: futures::StreamExt`
            `&futures::stream::FuturesUnordered<&impl futures::Future<Output = std::result::Result<(), std::io::Error>>>: futures::Stream`
            which is required by `&futures::stream::FuturesUnordered<&impl futures::Future<Output = std::result::Result<(), std::io::Error>>>: futures::StreamExt`
            `&mut futures::stream::FuturesUnordered<&impl futures::Future<Output = std::result::Result<(), std::io::Error>>>: futures::Stream`
            which is required by `&mut futures::stream::FuturesUnordered<&impl futures::Future<Output = std::result::Result<(), std::io::Error>>>: futures::StreamExt`
            `futures::stream::FuturesUnordered<&impl futures::Future<Output = std::result::Result<(), std::io::Error>>>: std::iter::Iterator`
            which is required by `&mut futures::stream::FuturesUnordered<&impl futures::Future<Output = std::result::Result<(), std::io::Error>>>: std::iter::Iterator`

warning: unused import: `StreamExt`
  --> src/lib.rs:15:32
   |
15 |     stream::{FuturesUnordered, StreamExt},
   |                                ^^^^^^^^^

I constructed the FuturesUnordered from a Vec<impl Future<Output = Result<(), Error>>> (tls_servers below) which is created by multiple call to the serve() function from axum_server (different addresses).

The code:

let futures = FuturesUnordered::from_iter(tls_servers.iter());
let futures = futures.collect::<Vec<()>>();

You cannot use futures by shared reference. tls_servers.iter() seems to be an iterator over shared references, hence what you got is a FuturesUnordered<&…>, i.e. a FuturesUnordered or shared references to… well… the future type in question. Consider using .into_iter() instead.

As the code is right now, the type FuturesUnordered<&impl Future<…>> does not implement Stream, because &impl Future<…> does not implement Future, though unfortunately the compiler is not helpful enough to point this out, as far as I can see. I mean, it does point out that the FuturesUnordered<&impl Future<…>>: Stream implementation is missing, but it does not point out that this implementation only doesn’t exist because &impl Future<…> is not a future.

Thank you for your insight, I’ve changed for into_iter() but it’s hitting another issue:

error[E0277]: the trait bound `std::vec::Vec<()>: std::iter::Extend<std::result::Result<(), std::io::Error>>` is not satisfied
   --> src/lib.rs:146:35
    |
146 |             let futures = futures.collect::<Vec<()>>();
    |                                   ^^^^^^^ the trait `std::iter::Extend<std::result::Result<(), std::io::Error>>` is not implemented for `std::vec::Vec<()>`
    |
    = help: the following other types implement trait `std::iter::Extend<A>`:
              <std::vec::Vec<T, A> as std::iter::Extend<&'a T>>
              <std::vec::Vec<T, A> as std::iter::Extend<T>>
note: required by a bound in `futures::StreamExt::collect`
   --> /home/geobert/.cargo/registry/src/cargo-mirror-83fdf68b7274ef46/futures-util-0.3.26/src/stream/stream/mod.rs:516:29
    |
516 |     fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C>
    |                             ^^^^^^^^^^^^^^^^^^ required by this bound in `futures::StreamExt::collect`

is io::Result the issue?

Try collect::<Result<Vec<()>, _>>

… hmm, this might actually not work. Maybe this method though.

I think collect::<Vec<Result<(), std::io::Error>>> is the right thing to collect:

use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;

async fn foo() -> Result<(), std::io::Error> { Ok(()) }

#[tokio::main]
async fn main() {
    let futures = vec![foo(), foo(), foo()];
    
    let futures = FuturesUnordered::from_iter(futures);
    
    let _ = futures.collect::<Vec<Result<(), std::io::Error>>>().await;
}

Playground.

1 Like

Well… okay… that depends on what the desired behavior/result is for the case of Err :slight_smile:

are the errors to be collected into a Vec, or should the futures all be stopped as soon as the first one returns an error.

1 Like

Didn't know try_collect is eager, i.e. stops after the first error. But you are right, this'd make much more sense.

Here an example with try_collect:

use futures::stream::FuturesUnordered;
use futures::stream::TryStreamExt;

async fn foo() -> Result<(), std::io::Error> { Ok(()) }

#[tokio::main]
async fn main() {
    let futures = vec![foo(), foo(), foo()];
    
    let futures = FuturesUnordered::from_iter(futures);
    
    let _: Result<Vec<()>, std::io::Error> = futures.try_collect().await;
}

Playground.

1 Like

As it’s a server listening, I think it would be better to avoid stopping the whole server if one of the address can’t be listened to? (And log it in the log file)

So @jofas ’s solution is the one, if I understand it properly :smile:

EDIT: I need to wrap the call to axum_server's serve() in order to catch the error and log it though

By the way, collecting to () instead of Vec<()> should work, too, as it implements Extend<()> properly, AFAIR; so Result<(), std::io::Error>. Actually… since there’s a playground, I can test it and confirm… yes it works. Maybe not a relevant change for @Geobomatic, as far as I understood the last answer, but it’s a useful thing to know in general, IMO, that once can collect into a ().

3 Likes

Thanks both of you!

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.