Is a parallel pipleline possible with futures and threadpools?

Hi,

first of all, a warm hello to the rust community. :wave:

I am experimenting with the futures and futures-cpupool libraries in order to get a parallel pipelined processing of a stream of inputs. The execution should happen like this:

  1. The source produces data 0.
  2. stage 0 processes data 0. At the same time, the source produces data 1.
  3. stage 1 processes data 0. At the same time, the stage 0 processes data 1. At the same time, source produces data 2.
  4. ...

However, it is mandatory, that every stage processes the input in the correct order (the stages might have state). The actual amount of parallelism should be handled by the thread pool.

Unfortunately, I just cannot get it to run in parallel.

My example code uses a random time of execution between the stages. I get the output

Source  produced 1 (in thread "source")
Source  produced 2 (in thread "source")
Stage 0 received 1 (in thread "pool-0")
Source  produced 3 (in thread "source")
Stage 1 received 1 (in thread "pool-1")
Source  produced 4 (in thread "source")
Stage 2 received 1 (in thread "pool-2")
Stage 0 received 2 (in thread "pool-3")
Source  produced 5 (in thread "source")
Stage 1 received 2 (in thread "pool-0")
Stage 2 received 2 (in thread "pool-1")
Source  produced 6 (in thread "source")
Source  produced 7 (in thread "source")
Stage 0 received 3 (in thread "pool-2")
Source  produced 8 (in thread "source")
Stage 1 received 3 (in thread "pool-3")
Stage 2 received 3 (in thread "pool-0")
Source  produced 9 (in thread "source")
Stage 0 received 4 (in thread "pool-1")
Stage 1 received 4 (in thread "pool-2")
Stage 2 received 4 (in thread "pool-3")
Stage 0 received 5 (in thread "pool-0")
Stage 1 received 5 (in thread "pool-1")
Stage 2 received 5 (in thread "pool-2")
Stage 0 received 6 (in thread "pool-3")
Stage 1 received 6 (in thread "pool-0")
Stage 2 received 6 (in thread "pool-1")
Stage 0 received 7 (in thread "pool-2")
Stage 1 received 7 (in thread "pool-3")
Stage 2 received 7 (in thread "pool-0")
Stage 0 received 8 (in thread "pool-1")
Stage 1 received 8 (in thread "pool-2")
Stage 2 received 8 (in thread "pool-3")
Stage 0 received 9 (in thread "pool-0")
Stage 1 received 9 (in thread "pool-1")
Stage 2 received 9 (in thread "pool-2")

Obviously, apart from the producing stage (which is running in its own thread) I cannot get the next stages to interleave.

Is it possible to get the futures library to behave the way I want?

I believe the problem here is that you're using Stream which generally only operates on one element at a time. You probably want to use the buffered combinator which'll allow some extra concurrency.

1 Like

Thanks for your reply, Alex. Unfortunately, I cannot get it to work the way I had hoped for.

Does anybody know of a tutorial of how to use streams with futures-cpupool (instead of tokio event loops)?

The way to go with pipelined stream processing might be the tokio streaming protocols, but defining a Frame, Codec, Protocol and Service seems very network-specific.

1 Like

I don't believe there's anything specifically for futures-cpupool, but does this help?

1 Like

Thank you very much for the code, Alex. Unfortunately, the nature of the threadpool results in an interleaved order of execution at a stage.

Example output might be

Stage 0 received 0 (in thread "pool-0")
Stage 1 received 0 (in thread "pool-1")
Stage 0 received 2 (in thread "pool-3")
Stage 0 received 1 (in thread "pool-2")