StreamExt::buffered

I have a Stream of values representing work items. Each item has an id. The ids aren't sequential, but they always arrive in order (I might get item 36, then 37, then 483, then 991, ...).

I want to do these concurrently, and always be able to answer the question "what's the maximum id X such that item X and all preceding items are done?"

I've got some code using StreamExt::buffered in the futures crate. It's a way of achieving concurrency, but ... I just found out the number of futures you pass to .buffered() is not the number of futures you can expect to have running concurrently, because finished futures count against the limit!

If your futures finish strictly in the order they are inserted, you'll get n running at all times.

If the latency instead follows a power law, so that futures have a "half-life", about half of the pending promises at any given time will be finished, so you'll only get n/2 concurrency.

If you get unlucky, and job 0 of n takes much longer to finish than others, you'll have no concurrency at all while it runs and the other futures are all finished.

Has anyone run into this? Is there a better tool for what I'm doing?

If finished futures didn't count against the limit, it would mean the number of finished futures that have to be stored somewhere is unbounded.

Well - running and finished promises usually tax pretty different resources, though. It seems like a lot of use cases might welcome the ability to configure the two limits (concurrency and memory) independently...

But to be fair, I don't know much about the range of use cases. I'm mainly after something that'll work for my use case!

I think you could use buffered() twice, introducing a std::future::ready or similar dummy future around the result value, with a larger limit on the second one.

1 Like

How would that work? The .buffered() I already have holds on to finished futures. That won't be changed by adding a second .buffered() after... right?

The buffer holds on to finished futures until they are taken from it. .buffered() (effectively) calls .next() on its underlying stream to fill its buffer. So, in this case, the second buffer pulls the finished futures out of the first buffer. (The fact that it also runs futures is entirely superfluous in this case.)

To be clear, I mean this sort of setup:

futures
    .buffered(4)
    .map(std::future::ready)
    .buffered(128)

There might be a better tool to do this job; this one is just the one I thought of.

1 Like

Suppose futures contains [p1, p2, p3, p4], of which only p1 takes a long time to finish. The output from futures.buffered(4) will nonetheless be [v1, v2, v3, v4] in that order, right? The Buffered stream holds on to finished futures p2, p3, and p4 until p1 is finished. They can't be taken from it.

Oh, I see, yes. You would need to use buffer_unordered() instead, but that could lead to unbounded buffering of out-of-order results given your requirement for ordering.

I think that there is no adapter in futures that properly solves this problem, and you would have to write something custom to get the ideal behavior (buffer N tasks and M completed items). But since you only care about ID numbers, which are small, then assuming the items are not too numerous to fit all their IDs in memory, buffer_unordered() should do — as you consume the stream, keep a BTreeSet of completed IDs, and whenever the first ID in the set is equal to the maximum completed ID + 1, remove it and increase the maximum.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.