How to run infinite concurrent and parallel tasks?

Hi,

Following https://gist.github.com/lu4nm3/b8bca9431cdcf19d73040ada13387e58

The gist's code creates x tasks, collect them in a Vec then await on them, max 3 at a time.

How to turn this into an infinite run of max 3 concurrent and parallel requests?

I'm doing a stress test tool based on this gist, but now I want to have an option to run indefinitely (until Ctrl-C) and I'm not sure on how to have something infinite as this code needs to collect into a Vec.

The iterator doesn't have to be finite. You can use a range like (0..) or std::iter::repeat which will not stop producing values.

As for the collect, you can replace it with for_each.

Regarding your use of thread::sleep, please read Async: What is blocking? if you have not already. By the context of this post, it seems that you might know of the cavaets in that post, but I'm just making sure.

2 Likes

Thanks for your answer!

I'm not using sleep in my own code :slight_smile:

Thanks for the for_each tip, that was the piece blocking me :smiley:

I'm going for this code atm:

let fetches = futures::stream::iter(
        (0..)
            .into_iter()
            .map(|_| tokio::spawn(make_request())),
    )
    .for_each_concurrent(50, |f| async move { f.await });
    fetches.await;

but it doesn't compile and I'm not sure why:

error[E0271]: type mismatch resolving `<impl futures::Future as futures::Future>::Output == ()`
  --> src\main.rs:57:6
   |
57 |     .for_each_concurrent(50, |f| async move { f.await });
   |      ^^^^^^^^^^^^^^^^^^^ expected enum `std::result::Result`, found `()`
   |
   = note:   expected enum `std::result::Result<(), tokio::task::JoinError>`
           found unit type `()`

error[E0271]: type mismatch resolving `<impl futures::Future as futures::Future>::Output == ()`
  --> src\main.rs:58:5
   |
58 |     fetches.await;
   |     ^^^^^^^^^^^^^ expected enum `std::result::Result`, found `()`
   |
   = note:   expected enum `std::result::Result<(), tokio::task::JoinError>`
           found unit type `()`
   = note: required because of the requirements on the impl of `futures::Future` for `futures::stream::ForEachConcurrent<futures::stream::Iter<std::iter::Map<std::ops::RangeFrom<{integer}>, [closure@src\main.rs:55:18: 55:80]>>, impl futures::Future, [closure@src\main.rs:57:49: 57:75]>`
   = note: required by `futures::Future::poll`

error[E0698]: type inside `async fn` body must be known in this context
  --> src\main.rs:52:9
   |
52 |     let fetches = futures::stream::iter(
   |         ^^^^^^^ cannot infer type for type `{integer}`
   |
note: the type is part of the `async fn` body because of this `await`
  --> src\main.rs:58:5
   |
58 |     fetches.await;
   |     ^^^^^^^^^^^^^

error[E0698]: type inside `async fn` body must be known in this context
  --> src\main.rs:58:5
   |
58 |     fetches.await;
   |     ^^^^^^^ cannot infer type for type `{integer}`
   |
note: the type is part of the `async fn` body because of this `await`
  --> src\main.rs:58:5
   |
58 |     fetches.await;
   |     ^^^^^^^^^^^^^

error[E0698]: type inside `async fn` body must be known in this context
  --> src\main.rs:58:5
   |
58 |     fetches.await;
   |     ^^^^^^^^^^^^^ cannot infer type for type `{integer}`
   |
no

It's because f.await returns Result<(), JoinError>, but for_each_concurrent requires it to return ().

1 Like

Thank you a lot, again :slight_smile:

It's compiling and running now. The only thing I'm not totally sure of is the parallelism of the code.

for_each_concurrent doc says:

This is similar to StreamExt::for_each, but the futures produced by the closure are run concurrently (but not in parallel-- this combinator does not introduce any threads).

But I'm using tokio::spawn to create the Futures, is that sufficient to run concurrently AND parallely? (is that a word? :sweat_smile: )

I understand why you are confused here, but when you use the two together in this manner, then you are indeed running them in parallel.

Basically what happens is that for_each_concurrent stores a collection of active JoinHandles, and whenever this collection has less than 50 of them, it will ask for another one from the stream. When it asks the stream for another one, this causes the stream to call tokio::spawn, spawning a new task that starts running in parallel, whose JoinHandle is added to the collection. Whenever a task finishes, the corresponding JoinHandle will complete, and for_each_concurrent will remove the handle from the collection, making space for another one.

So in short, the only thing that for_each_concurrent is doing here is calling tokio::spawn at the right time such that at most 50 are running. It doesn't actually run them.

1 Like

I see, the key is really spawn (and using rt-multi-thread, obviously) but the doc made me doubt :slight_smile:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.