I want to join an unknown number of futures, i.e. I don't know the number beforehand, but I can see from the return value of the futures when I have to stop.
Let's say I have the following function, that gives back a bool indicating if it's result should be included in the final result or not. Basically, this boolean will be true for the first n calls (where n is unknown beforehand) and false after.
until the boolean is false. This would probably have to be buffered somehow (i.e. only k tasks are performed simultaneously, when one finishes, the next one from the iterator is spawned). When the condition is false, the rest of the already spawned tasks is cancelled and the unspawned rest of the iterator discarded.
Does something like this exist already (or can I somehow create it from tokio and futures primitives)?
This sounds like you are looking for futures::stream::FuturesOrdered to me. You'd need to handle the buffering logic yourself though, pushing a new future to the queue once the next value resolves.
use futures::stream::{StreamExt, TryStreamExt};
use futures::future::ready;
let tasks: Vec<TaskResult> = futures::stream::iter(0..)
.map(|i| perform_task(i))
.buffer_unordered(50)
.try_take_while(|&(b, _)| ready(Ok(b)))
.map_ok(|(_, res)| res)
.try_collect().await?;
The number passed to buffer_unordered decides how many are concurrently active.
I tried to get something similar that will let you spawn them on tokio's runtime, but I ended up with this, which is probably a better structure anyway.
use tokio::task::JoinSet;
let mut tasks: Vec<TaskResult> = Vec::new();
let mut set = JoinSet::new();
let mut task_number = 0..;
for i in task_number.by_ref().take(50) {
set.spawn(perform_task(i));
}
for i in task_number {
// unwrap 1: panics if the set is empty
// unwrap 2: panics if a task is cancelled or panicked
// ?: Result from perform_task
let (b, res) = set.join_next().await.unwrap().unwrap()?;
if !b {
break;
}
tasks.push(res);
set.spawn(perform_task(i));
}
drop(set);
This will return the futures out-of-order. If you want them in order, you can use an ordered structure like a VecDeque (and spawn them with tokio::spawn) instead of JoinSet.
This looks very promising! I've implemented it now using buffered instead of buffer_unordered. That is almost exactly doing what I want.
I don't really care of the order of the results, but what I definitely care about is that all results before the first excluded one are there somewhere in the final result. With buffer_unordered it can happen, that a result with a false gets joined before all of the true ones have been joined, which causes them to be dropped as well.
However, buffered only spawns a new task if the very first one has finished, which isn't completely ideal. I'd need some sort of hybrid between FuturesOrdered and buffer_unordered, I guess.
If I understand this correctly, this has the same problem with possibly dropping some true results, right?
How about something like this? It is unordered (spawning a new task as soon as any of the buffered tasks finishes) and won't drop any tasks received before the one that stops your computations. It might spawn too many tasks though.
If your goal is too keep the I/O busy by having a constant number of running tasks, not counting the completed ones, then you'll probably be best off using unordered and then storing the completed-but-not-oldest tasks separately.
That's easy enough to do if you put the index in TaskResult. Then you can put the completed ones in a heap or something, before organizing them into the final Vec.
I also tried to get it so that you don't have to poll tasks after the first false in order to get the ones before false, but it got complicated. I think you'd need to remake FuturesUnordered but change IntoIterator to be ordered.