Idea for a buffered channel

I just had this idea for a very simple buffered channel, code below. The idea is simply to buffer sends until there are a good number of items, then send them all at once.

It does seem to have possible potential, it runs very roughly 10 times faster than an un-buffered channel in the included tests. However this would only hold for large numbers of items, so I am not convinced it is useful in practice, but I guess it might be. The idea seems rather obvious, so is there a crate that already does this? Is this useful? Worth publishing as a crate?

I did find a crate with a somewhat similar idea, but it seemed to claim no significant performance advantage ( which seemed odd to me ).

use std::collections::VecDeque;
use std::sync::mpsc;

const N: usize = 4096;

pub struct Sender<T> {
    v: VecDeque<T>,
    s: mpsc::Sender<VecDeque<T>>,
}

pub struct Receiver<T> {
    v: VecDeque<T>,
    r: mpsc::Receiver<VecDeque<T>>,
}

impl<T> Sender<T> {
    pub fn send(&mut self, t: T) {
        self.v.push_back(t);
        if self.v.len() == N {
            self.flush();
        }
    }
    pub fn flush(&mut self) {
        let v = std::mem::take(&mut self.v);
        self.s.send(v).unwrap();
        self.v.reserve(N);
    }
}

impl<T> Drop for Sender<T> {
    fn drop(&mut self) {
        self.flush();
    }
}

impl<T> Receiver<T> {
    pub fn recv(&mut self) -> Option<T> {
        loop {
            if let Some(t) = self.v.pop_front() {
                return Some(t);
            }
            if let Ok(v) = self.r.recv() {
                self.v = v;
            } else {
                return None;
            }
        }
    }
}

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let (s, r) = mpsc::channel::<VecDeque<T>>();
    let s = Sender {
        v: VecDeque::with_capacity(N),
        s,
    };
    let r = Receiver {
        v: VecDeque::new(),
        r,
    };
    (s, r)
}

#[cfg(test)]
const TSIZE: u64 = 10000000;

#[test]
fn test1() // Uses buffered mpsc channel
{
    use std::thread;

    let start = std::time::Instant::now();

    let (mut tx, mut rx) = channel::<u64>();
    thread::scope(|s| {
        s.spawn(move || {
            for i in 0..TSIZE {
                tx.send(i);
            }
        });
        s.spawn(move || {
            let mut total = 0;
            while let Some(_x) = rx.recv() {
                total += 1;
            }
            assert!(total == TSIZE);
        });
    });

    let t = start.elapsed().as_millis() as u64;
    println!("test1 Time elapsed ={}", t);
}

#[test]
fn test2() // Uses standard mpsc channel
{
    use std::thread;

    let start = std::time::Instant::now();

    let (tx, rx) = mpsc::channel::<u64>();
    thread::scope(|s| {
        s.spawn(move || {
            for i in 0..TSIZE {
                tx.send(i).unwrap();
            }
        });
        s.spawn(move || {
            let mut total = 0;
            while let Ok(_x) = rx.recv() {
                total += 1;
            }
            assert!(total == TSIZE);
        });
    });

    let t = start.elapsed().as_millis() as u64;
    println!("test2 Time elapsed ={}", t);
}

As bufchan's description notes, buffering comes at a significant cost for latency. That is, while sending 1M messages might be faster with buffering, if you measure the time between sending a message and receiving it this will be generally be much higher. For many channel usecases this will be a dealbreaker. Moreover you also have an increased risk of deadlocks: sending a message and then blocking for receiving it (without dropping the sender) will result in your program being stuck.

Is this useful? Worth publishing as a crate?

Do you have a usecase for this? Otherwise it feels like a solution in search of a problem.

Maybe, I could possibly use it for my flate3 compression crate, which has a thread that finds matches and sends them for encoding. Although I think it would only be (potentially) useful when compressing large amounts of data,

I don't quite follow your comment about deadlock, although of course the buffering means the consumer has to wait until the producer finishes, or there are enough items for the buffered items to be sent. So it cannot be used in every case. The "N" value should probably be a parameter for channel creation, as the appropriate size is related to the amount of data to be processed.

Edit: I did the same test with bufchan, sometimes it is slightly (20%) slower than my buffered channel, sometimes it is much slower (5x), which is a little puzzling. What I vaguely suspect is any kind of shared (read/write) memory usage (other than what is essential) slows things down a lot.

Edit 2: When using mimalloc for global allocator, the poor (5x) bufchan performance issue goes away. I have no idea why, but I suppose it is some issue with sharing cache lines or whatever. The same for two other global allocators I tried.

This is commonly called batching, and it is used widely across many areas of computer science and engineering.

The general idea is always the same: instead of processing many small units one by one, you collect them into a larger group and process that group as a single batch.

Examples are everywhere: small messages can be accumulated and sent together; in audio or signal processing, samples are processed in blocks; in computer graphics, draw commands are batched before being submitted to the GPU; in databases, multiple inserts or updates are grouped into a single transaction; and in large-scale computing, work is split into partitions and distributed across nodes.

Usually, this does not require a special “batching” channel implementation. You can send and receive Vecs instead. On the sender side, create a Vec with the capacity you expect, fill it with items, and send the whole vector once it is ready.

This often works well because the sender is typically the component that knows how many small items it wants to send, or when it is worth flushing the current batch.

Well I published it as a crate:

And it is used by my flate3 crate.

For those who find it on crates.io, would it makes sense to say it has better throughput but worse latency than std::sync::mpsc::channel? That's my impression based on the conversation -- is that correct?

Yes, the buffering (or batching, whatever you want to call it) means the latency is higher, the Receiver will not get anything until the buffer limit is reached ( or the Sender is dropped, or flush is called ).

Buffering is a great optimization for amortizing the cost of channel operations but it is a recipe for deadlock. If you don't explicitly flush and if you have a cycle in your program (you send from A to B and B to A; A buffers messages; you don't drop a sender in A until you get some message back from B, because you might send more messages), it can get stuck.

Why?

Is it correct that your first release of bchan has version 1.0.0?

Yes. It is a pretty simple crate, I don't anticipate it changing much if at all. I did forget to document methods, so I might do that at some point, although I think they are fairly self-explanatory for anyone familiar with std::sync::mpsc::channel.

I could also do some performance analysis to try and determine in what situations there is a performance gain, but I don't know if I will get around to that soon or ever.

It seems a useful abstraction, there doesn't seem to be another crate that does the same thing, so I decided to go ahead and publish it.

Oh yes, and the purpose of the post was partly to see if I missed something.

I just did some performance testing on flate3, and the buffered channel does make a big difference, the compression time for compressing a 39MB text file ( actually a SQL text dump of a database ) down to 6.7MB takes 1.7 seconds with the old version, now about 1 second.

Another optimisation using

use pstd::{localalloc::Local, veca};
type LVec = pstd::VecA<T, Local>;

for the internal Vecs gained about another 10%. Overall I am quite surprised, I thought the difference would be negligible. I guess I ought to compare it with other implementations of RFC 1951 such as flate2.

There are already crates that does this because this is not new idea. A channel that offers batched API exists since long ago. You can also fork Crossbeam and add batched send and receive API to it

This is another batched channel post that I have seen : Simple *and* fast channel? Too good to be true

This is tips that I know when implementing channel

  • do not busy loop
  • have a backpressure system
  • single threaded API without any multi threading syncronization overhead, and also offer multithreaded API using lockfree atomic operation (avoid false sharing in multithreaded API)
  • use stack ring buffer
  • have non batching API and batching API
  • have thread affinity functionality
  • have NUMA aware API
  • have both blocking and non blocking API
  • CPU prefetch

It is usefull for some case. That is why having send(), receive() and send_batched(), receive_batched() is good rather than only unbatched API. So it enables to always choose which one is best for the specific usecase

A long time ago (many years) I tried crossbeam, when I first wrote flate3. I cannot remember the outcome, but apparently I didn't find it helped (much). Of course it may have changed in the mean-time.

Bufchan is a different (more complex) approach, and has similar performance but seems to be a bit slower (and occasionally much (5x) slower), I think because there is still a shared buffer. There are trade-offs, my approach keeps allocating new Vecs (well VecDeques), which are sent through the channel and eventually deallocated. So you have more allocate/deallocate overhead, but on the other hand you do not have threads reading/writing the same cache line "often" (for each element transferred, rather than once every several thousand items, although... bufchan does do batching, just differently, I don't know why it is slower. Well, it does have an extra move for each element, that may be partially it). I am no expert on cache lines and how processors work though, I just have a very vague intuition about it from some things I have read, memory synchronisation between processors is "expensive". What I have just written may be all wrong, but anyway my (very simple) approach does seem to work well.

To be clear: bufchan may well work better in some cases (relatively small buffers where allocate/deallocate overhead is more significant), but I think my approach works best for the purposes of flate3, at least for large inputs. And of course it might depend on the hardware, I have only tested on my personal Chromebook.

I mean both your approach and bufchan can be improved again

Your approach wrap std mpsc, adding vecdeque as temporary store that will collect the n batched messages. Std mpsc is crossbeam channel. You can replace the vecdeque with fixed size stack allocated ring buffer (you can also try heap allocated ring buffer, because copy big stack data can be slower than copy heap pointer in big data size eg the N batch is big). Where bufchan use arc mutex + vecdeque. I did channel benchmark, ring buffer based channels are faster than std channel (which is crossbeam), flume, etc here : GitHub - fuji-184/Rust_Channel_Benchmark · GitHub

So ring buffer + batching + cpu cache locality will be very good combo for high throughput channel, potentially can handle bilions of messages per second

Oh, sorry, I misunderstood you. Well.... yes, maybe you can get a small improvement that way. I could try it. I will have a look at ringbuffer_spsc. Well... it looks interesting, but quite a bit more complex. I could come back to it.

Well, flate3 does seem to be faster than flate2, although compression is a little worse (I can tune it), my current test result:

Flate3 Time elapsed = 964 ms, comp len = 6750238
Flate2 Time elapsed = 1614 ms, comp len = 6678555

I am quite pleased with this. It isn't terrible!

Well, that is flate2 level 6 (default features), selecting level 5, the results are pretty close:

Flate3 Time elapsed = 946 ms, comp len = 6750238 ( slightly slower, slightly more compression )
Flate2 Time elapsed = 841 ms, comp len = 6786383

So I have some work to do to claim any significant advantage!

Using a bigger block size ( 0x4000 instead of 0x2000 ), and flate2 level 6, I get

Flate3 Time elapsed = 908 ms, comp len = 6676319
Flate2 Time elapsed = 1616 ms, comp len = 6678555

So significantly faster with similar (slightly better) compression. But manually adjusting the block size feels like cheating! I do have a dynamic block size option, which works quite well in this case:

Flate3 Time elapsed = 960 ms, comp len = 6597310
Flate2 Time elapsed = 1559 ms, comp len = 6678555

Maybe I will leave that enabled by default in future. A google search says:

"zlib emits a deflate block once a selected number of literals + length/distance pairs have been generated. By default, that number is 16383."

which is quite interesting. I guess someone researched that quite carefully ( and I have done nothing of the sort ). I could do something similar. It seems my default block size of 0x2000 (input bytes) is too small. I have no idea why I chose that, maybe just a random guess! It is quite hard to make a fair comparison.

When discussing performance, it helps to compare against multiple alternatives, not just flate2. I put together a criterion benchmark on the first 20 MiB of enwik8 (English Wikipedia, the standard compression benchmark corpus):

Compressor Ratio Compress Decompress Threads
lz4_flex 1.73x 309 MiB/s 1.70 GiB/s 1
zstd level-1 2.44x 263 MiB/s 1.06 GiB/s 1
zstd level-3 2.80x 143 MiB/s 851 MiB/s 1
flate3 2.62x 35 MiB/s 118 MiB/s 3 / 1
flate2 2.72x 23 MiB/s 292 MiB/s 1
miniz_oxide 2.72x 23 MiB/s 316 MiB/s 1

flate3 is ~36% faster than flate2, but uses 3 threads to get there. Single-threaded zstd at level 1 compresses similarly (2.44x vs 2.62x) at 8x the throughput. The match-finder channel (bchan or otherwise) is not the bottleneck here. The deflate algorithm itself is.

BENCH_INPUT=<path> lets you point at any file. cargo bench --bench compress to reproduce.