I just had this idea for a very simple buffered channel, code below. The idea is simply to buffer sends until there are a good number of items, then send them all at once.
It does seem to have possible potential, it runs very roughly 10 times faster than an un-buffered channel in the included tests. However this would only hold for large numbers of items, so I am not convinced it is useful in practice, but I guess it might be. The idea seems rather obvious, so is there a crate that already does this? Is this useful? Worth publishing as a crate?
I did find a crate with a somewhat similar idea, but it seemed to claim no significant performance advantage ( which seemed odd to me ).
use std::collections::VecDeque;
use std::sync::mpsc;
const N: usize = 4096;
pub struct Sender<T> {
v: VecDeque<T>,
s: mpsc::Sender<VecDeque<T>>,
}
pub struct Receiver<T> {
v: VecDeque<T>,
r: mpsc::Receiver<VecDeque<T>>,
}
impl<T> Sender<T> {
pub fn send(&mut self, t: T) {
self.v.push_back(t);
if self.v.len() == N {
self.flush();
}
}
pub fn flush(&mut self) {
let v = std::mem::take(&mut self.v);
self.s.send(v).unwrap();
self.v.reserve(N);
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.flush();
}
}
impl<T> Receiver<T> {
pub fn recv(&mut self) -> Option<T> {
loop {
if let Some(t) = self.v.pop_front() {
return Some(t);
}
if let Ok(v) = self.r.recv() {
self.v = v;
} else {
return None;
}
}
}
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (s, r) = mpsc::channel::<VecDeque<T>>();
let s = Sender {
v: VecDeque::with_capacity(N),
s,
};
let r = Receiver {
v: VecDeque::new(),
r,
};
(s, r)
}
#[cfg(test)]
const TSIZE: u64 = 10000000;
#[test]
fn test1() // Uses buffered mpsc channel
{
use std::thread;
let start = std::time::Instant::now();
let (mut tx, mut rx) = channel::<u64>();
thread::scope(|s| {
s.spawn(move || {
for i in 0..TSIZE {
tx.send(i);
}
});
s.spawn(move || {
let mut total = 0;
while let Some(_x) = rx.recv() {
total += 1;
}
assert!(total == TSIZE);
});
});
let t = start.elapsed().as_millis() as u64;
println!("test1 Time elapsed ={}", t);
}
#[test]
fn test2() // Uses standard mpsc channel
{
use std::thread;
let start = std::time::Instant::now();
let (tx, rx) = mpsc::channel::<u64>();
thread::scope(|s| {
s.spawn(move || {
for i in 0..TSIZE {
tx.send(i).unwrap();
}
});
s.spawn(move || {
let mut total = 0;
while let Ok(_x) = rx.recv() {
total += 1;
}
assert!(total == TSIZE);
});
});
let t = start.elapsed().as_millis() as u64;
println!("test2 Time elapsed ={}", t);
}