I've got a stream that's producing some items and I want to execute it on a different thread than the one that consumes them.
Using futures
crate I created code like this:
let mut stream = futures::stream::repeat(())
.take(10000000)
.enumerate()
.map(move |(i, _)| {
// .... produce the item here
});
let (mut tx, rx) = futures::channel::mpsc::unbounded();
let pool = futures::executor::ThreadPool::builder()
.pool_size(1)
.create()
.unwrap();
pool.spawn_ok(async move {
while let Some(r) = stream.next().await {
tx.unbounded_send(r).unwrap()
}
});
// Receiver:
while let Some(r) = stream.next().await {
// nothing here
}
I noticed this code can unfortunately transfer only about 900k items per second,
while when consuming the source stream directly yields over 3 millions items per second and uses less CPU.
Is it expected, or am I doing something wrong?
I tried with tokio, and also with bounded channels - and actually with bounded channels the performance was even worse.