Poor performance of sending items over an mpsc::channel

I've got a stream that's producing some items and I want to execute it on a different thread than the one that consumes them.

Using futures crate I created code like this:


let mut stream = futures::stream::repeat(())
        .take(10000000)
        .enumerate()
        .map(move |(i, _)| {  
            // .... produce the item here
        });

let (mut tx, rx) = futures::channel::mpsc::unbounded();
let pool = futures::executor::ThreadPool::builder()
      .pool_size(1)
      .create()
      .unwrap();

pool.spawn_ok(async move {
    while let Some(r) = stream.next().await {
        tx.unbounded_send(r).unwrap()
     }
});

// Receiver:
while let Some(r) = stream.next().await {
    // nothing here
}

I noticed this code can unfortunately transfer only about 900k items per second,
while when consuming the source stream directly yields over 3 millions items per second and uses less CPU.

Is it expected, or am I doing something wrong?

I tried with tokio, and also with bounded channels - and actually with bounded channels the performance was even worse.

1 Like

How much work do you do when consuming the stream directly? It might be that a lot of code is optimised out if it's just a no-op. Assuming you're building using release profile.

Another thought is that when using channels, synchronisation between threads must happen which doesn't occur when you're consuming the stream directly.

You can also try crossbeam channels which are more performant than the standard ones.

3 Likes

std::mpsc is known to be slow. Use crossbeam-channel.

edit: in 2023 this has been fixed. std::mpsc now uses crossbeam's implementation, and therefore has same performance. Despite that it still has a less flexible, less capable public API, so I think crossbeam is still a good choice.

2 Likes

Cosider batching your sends.

1 Like

Thank you. Batching looks like the way to go.
Crossbeam is not a solution, because it is not async and mixing blocking code with async can lead to bad things.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.