Futures with wide join_all() / select_all() fanout

Hi folks, I'm trying to understand the design of the futures-rs library. From reading through the code it looks like the algorithmic complexity of join_all() and select_all() when applied to large vectors is worse than optimal, so I want to check if I'm understanding correctly.

Here's a specific example. Let's say my server is trying to handle an incoming network request, and in order to do so I decide to make N=1000 outgoing requests and wait for all of them to return before responding to the original request -- for example, to do a search over 1000 leaf servers. Looking through the futures-rs documentation, join_all() seems to be the idiomatic way to to implement the intended "wait for all to return" semantics.

The implementation of poll() for JoinAll traverses a vector of size N to check which futures are ready, making it an O(N) operation. From my understanding of how futures-rs works, in the worst case poll() will be called every time that Task::unpark() is called (namely, once per response), but may be called less often if some unpark() calls occur close together in time. This means that in the worst case we make N calls to JoinAll::poll(), for an overall cost of O(N^2). In contrast, a hand-coded implementation directly against epoll or similar instead of futures-rs should be able to take only O(N) end-to-end CPU cost, because epoll directly reports the file descriptor that was triggered, avoiding the need for an O(N) scan over a vector.

A similar reasoning also seems to apply to select_all() when called with wide fanout.

Is my analysis correct? Is there a more efficient idiomatic solution to this problem? When looking further through the futures-rs code I see with_unpark_event() whose documentation suggests it may be useful for solving the above problem, but I don't understand how to apply it.

3 Likes

Out of curiousity, what is your use case for wanting to join across 1000 connections?

Most cases that I can think of, you're generally on the receiving end of that number of connections so your not in a position where you would join those.

The main kinds of applications are where I have a cluster of computers all working on a single big computation (such as a batch algorithm), rather than having lots of small computations (such as web server requests).

Some concrete applications where this would come up:

  • stochastic gradient descent with parameter servers, where the worker servers request parameters from sharded parameter servers, do some computation, and then push deltas back to the parameter servers.

  • a search engine, where I have a leaf server per shard of data, and the root server needs to send the search query to each leaf server and wait for their responses.

1 Like

I hesitate to offer a solution, b/c one of the core Tokio maintainers might have a better option, with that disclaimer:

I could imagine registering something with your futures, like a Sender from an MPSC channel, where the Receiver end would do something with positive and negative results from the spawned set of futures. You'd probably need a handle to the task to awake it after you received results from all those futures in say a Monitor future (something custom that you write). Then you'd wait on the result from that Monitor future.

See: futures::sync::mpsc::channel - Rust, which. Tokio should be capable of registering poll events on the Sender/Receiver of the channels properly waking as needed. I use that inside some libraries I wrote.

You'd obviously have to decide what to do on failures of any of those futures, do they matter? do you submit a new future? etc... anyway, just spit-balling here.