How to join an unknown number of futures?

I want to join an unknown number of futures, i.e. I don't know the number beforehand, but I can see from the return value of the futures when I have to stop.

Let's say I have the following function, that gives back a bool indicating if it's result should be included in the final result or not. Basically, this boolean will be true for the first n calls (where n is unknown beforehand) and false after.

struct TaskResult { /* ... */ }

async fn perform_task(i: usize) -> Result<(bool, TaskResult), String> { /* ... */ }

I can perform this task multiple times and join all the results into a vector like this:

#[tokio::main]
async fn main() -> Result<(), String>
    let res = futures::future::try_join_all(
        (0..50).map(|i| async move { perform_task(i).await })
    ).await?.into_iter()
        .take_while(|(b, _)| b)
        .map(|(_, res)| res)
        .collect::<Vec<_>>();
    // res: Vec<TaskResult>
    
    // ...
}

This works fine if n <= 50, but normally it is not.

What I would love to do, is to somehow join the futures resulting from

(0..).map(|i| async move { perform_task(i).await })

until the boolean is false. This would probably have to be buffered somehow (i.e. only k tasks are performed simultaneously, when one finishes, the next one from the iterator is spawned). When the condition is false, the rest of the already spawned tasks is cancelled and the unspawned rest of the iterator discarded.

Does something like this exist already (or can I somehow create it from tokio and futures primitives)?

This sounds like you are looking for futures::stream::FuturesOrdered to me. You'd need to handle the buffering logic yourself though, pushing a new future to the queue once the next value resolves.

1 Like

How about this

use futures::stream::{StreamExt, TryStreamExt};
use futures::future::ready;
let tasks: Vec<TaskResult> = futures::stream::iter(0..)
    .map(|i| perform_task(i))
    .buffer_unordered(50)
    .try_take_while(|&(b, _)| ready(Ok(b)))
    .map_ok(|(_, res)| res)
    .try_collect().await?;

The number passed to buffer_unordered decides how many are concurrently active.

I tried to get something similar that will let you spawn them on tokio's runtime, but I ended up with this, which is probably a better structure anyway.

use tokio::task::JoinSet;

let mut tasks: Vec<TaskResult> = Vec::new();
let mut set = JoinSet::new();
let mut task_number = 0..;

for i in task_number.by_ref().take(50) {
    set.spawn(perform_task(i));
}

for i in task_number {
    // unwrap 1: panics if the set is empty
    // unwrap 2: panics if a task is cancelled or panicked
    // ?: Result from perform_task
    let (b, res) = set.join_next().await.unwrap().unwrap()?;
    if !b {
        break;
    }
    tasks.push(res);
    
    set.spawn(perform_task(i));
}

drop(set);

This will return the futures out-of-order. If you want them in order, you can use an ordered structure like a VecDeque (and spawn them with tokio::spawn) instead of JoinSet.

2 Likes

This looks very promising! I've implemented it now using buffered instead of buffer_unordered. That is almost exactly doing what I want.
I don't really care of the order of the results, but what I definitely care about is that all results before the first excluded one are there somewhere in the final result. With buffer_unordered it can happen, that a result with a false gets joined before all of the true ones have been joined, which causes them to be dropped as well.
However, buffered only spawns a new task if the very first one has finished, which isn't completely ideal. I'd need some sort of hybrid between FuturesOrdered and buffer_unordered, I guess.

If I understand this correctly, this has the same problem with possibly dropping some true results, right?

1 Like

How about something like this? It is unordered (spawning a new task as soon as any of the buffered tasks finishes) and won't drop any tasks received before the one that stops your computations. It might spawn too many tasks though.

Yup.

If your goal is too keep the I/O busy by having a constant number of running tasks, not counting the completed ones, then you'll probably be best off using unordered and then storing the completed-but-not-oldest tasks separately.

That's easy enough to do if you put the index in TaskResult. Then you can put the completed ones in a heap or something, before organizing them into the final Vec.

I also tried to get it so that you don't have to poll tasks after the first false in order to get the ones before false, but it got complicated. I think you'd need to remake FuturesUnordered but change IntoIterator to be ordered.

1 Like

Try out this crate

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.