I've got a stream that's producing some items and I want to execute it on a different thread than the one that consumes them.
Using futures crate I created code like this:
let mut stream = futures::stream::repeat(())
.take(10000000)
.enumerate()
.map(move |(i, _)| {
// .... produce the item here
});
let (mut tx, rx) = futures::channel::mpsc::unbounded();
let pool = futures::executor::ThreadPool::builder()
.pool_size(1)
.create()
.unwrap();
pool.spawn_ok(async move {
while let Some(r) = stream.next().await {
tx.unbounded_send(r).unwrap()
}
});
// Receiver:
while let Some(r) = stream.next().await {
// nothing here
}
I noticed this code can unfortunately transfer only about 900k items per second,
while when consuming the source stream directly yields over 3 millions items per second and uses less CPU.
Is it expected, or am I doing something wrong?
I tried with tokio, and also with bounded channels - and actually with bounded channels the performance was even worse.
How much work do you do when consuming the stream directly? It might be that a lot of code is optimised out if it's just a no-op. Assuming you're building using release profile.
Another thought is that when using channels, synchronisation between threads must happen which doesn't occur when you're consuming the stream directly.
You can also try crossbeam channels which are more performant than the standard ones.
edit: in 2023 this has been fixed. std::mpsc now uses crossbeam's implementation, and therefore has same performance. Despite that it still has a less flexible, less capable public API, so I think crossbeam is still a good choice.
Thank you. Batching looks like the way to go.
Crossbeam is not a solution, because it is not async and mixing blocking code with async can lead to bad things.