Ring buffer implementations

I see there are several ring buffer crates as well as VecDeque. I wonder if anyone has experience of using one or more. This is what I need to be able to do which is what the C lib I have does.

One writer and one reader for each ring buffer.
Reader and writer will be on different threads.
Data will be u8.
Fixed size (quite large).
Ideally ability to write and read a block of data to/from a buffer rather than iterate through the bytes.
Ability to ask write space and read space available. So no growth, if no write space we drop blocks and if no read space we skip processing that cycle.

Looks like this can be solved by using only the standard library:

You can wrap the VecDeque in an RwLock.

VecDeque implements io::Read and io::Write.

You can pre-allocate a given size buffer using VecDeque::with_capacity().

VecDeque has len() and capacity() which can be used for limiting the number of bytes to read and write, respectively.

1 Like

Given the fact that "reader" actually modifies the VecDeque, a Mutex is probably a better choice.

2 Likes

Right, in fact I realized that while implementing it myself.

2 Likes

Hi @H2CO3, I'm still don't fully grok the Read and Write traits, Could you explain in more layman terms what @radiobob wanted here? and why calls to push_ and pop_ don't suffice? is it just a convenience, or perhaps also more efficient, than successive calls to pop_front (in the case where you may want to Read a number of values?)

They do, but if you know that you have an array of raw bytes, you can just memcpy a whole bunch of bytes all at once, which is why a Read/Write impl is potentially more efficient. I say potentially because there's no reason a sufficiently smart compiler can't optimize away repeated pushes/pops to a VecDeque<u8> and replace them with memcpy, it's just not strictly speaking guaranteed.

Also, read/write is needed if you want to interoperate with an API that needs those traits.

2 Likes

Thanks for the info. I am still struggling with Rust so I just keep trying until it compiles and then it normally works although I'm never quite sure whats going on under the covers.
This is my first attempt at understanding how this will work in a simple test program. I would appreciate a critique of that before I move it further towards what I want to achieve.

use std::collections::VecDeque;
use std::thread;
use std::time::Duration;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;

fn main() {
    let mut deque: VecDeque<u8> = VecDeque::with_capacity(10000);
    let sh_deque = Arc::new(Mutex::new(deque));
    

    let arc1 = sh_deque.clone();
    let sender_join_handle = thread::spawn(  move || {
        rb_sender(& arc1);
    });

    thread::sleep(Duration::from_millis(1000));

    let arc2 = sh_deque.clone();
    let receiver_join_handle = thread::spawn(  move  || {
        rb_reader(& arc2);
    });

    thread::sleep(Duration::from_millis(1000));
    sender_join_handle.join();
    receiver_join_handle.join();
}

fn rb_sender(o: &Arc<Mutex<VecDeque<u8>>>) {
    let q = &mut *o.lock().unwrap();

    println!("Capacity sender {}", q.capacity());
    println!("Length sender {}", q.len());
    println!("Is empty sender {}", q.is_empty());

    q.push_back(1);
    q.push_back(2);
    q.push_back(3);
}

fn rb_reader(o: &Arc<Mutex<VecDeque<u8>>>) {
    let q = &mut *o.lock().unwrap();

    println!("Capacity reader {}", q.capacity());
    println!("Length reader {}", q.len());
    println!("Is empty reader {}", q.is_empty());

    println!("Data {:?},{:?},{:?}", q.pop_front(), q.pop_front(), q.pop_front());
}

Did you check my implementation above?

No. I wanted to try and work it out myself but if you think it would help now I will take a look.

OK, so I looked. That was my intention to lift the level up and just have what I needed. I'm operating at a much lower experience level so it will take me a while to work out what you did. I assume I can wrap SyncByteRingBuf in an Arc and pass it to my threads. I did have a problem with some compile errors. Maybe Rust needs updating. A few errors like this.

error[E0599]: no method named read_exact found for struct MutexGuard<'_, VecDeque<u8>> in the current scope
--> src\main.rs:62:20
|
62 | self.0.read_exact(buf).map(|_| buf.len())
| ^^^^^^^^^^ method not found in MutexGuard<'_, VecDeque<u8>>

Updated Rust and runs now.

Nothing special at all. Most of the code in that playground doesn't actively do much of anything, actually.

I merely created three wrapper types: one around Mutex<VecDeque<u8>>, and two around MutexGuard (one for reading, one for writing, but these could have been just one type), in order to encapsulate the logic of skipping reading/writing when the buffer is empty/full, respectively.

Sure. SyncByteRingBuf is Send + Sync.

Probably misunderstanding how this works. To my mind a ring buffer has a read and write pointer. I should not be able to write data if it would overwrite data that has not been read. Conversely I should not be able to read data if the amount I want to read would overlap the write pointer and therefore partially read old data. I just added a small bit of code to main assuming that as the buffer is filled by the write try_write() should say no, and as I've read a full buffer try_read() should say no but both say OK to read and write.

fn main() -> io::Result<()> {
    let buf = SyncByteRingBuf::with_capacity(1024);
    
    let outdata: Vec<u8> = (0..=255).cycle().take(1024).collect();
    buf.write().write_all(&outdata)?;
    let r = buf.try_write();
    match r {
        Ok(m) => println!("OK to write"),
        Err(e) => println!("No write space {:?}", e),
    }
    
    let mut indata = vec![0; 1024];
    buf.read().read_exact(&mut indata)?;
    let r = buf.try_read();
    match r {
        Ok(m) => println!("OK to read"),
        Err(e) => println!("No read space {:?}", e),
    }
    
    assert_eq!(indata, outdata);
    
    Ok(())
}

My SyncByteRingBuf::try_() methods simply wrap the try_lock() method of Mutex, i.e., they return an error if the mutex can't be acquired without blocking. They have nothing to do with whether the buffer is full. Those conditions are checked by the returned guard objects' read() and write() methods.

Of course, it is possible to modify the methods in such a way that they return an error when the buffer is empty/full, but I'm feeling that this would conflate the two problems.

OK, so this is getting a bit closer. I think the last thing I need is that the read and write size are not the same which is where a ring buffer is a much better solution than linear buffers. I can see there is a read_exact which needs a ReadBuff but this looks like a new feature and I'm not sure how to use it.

fn main() -> io::Result<()> {
    let buf = SyncByteRingBuf::with_capacity(1024);
    
    for i in 0..10 {
        let outdata: Vec<u8> = (0..=255).cycle().take(200).collect();
        buf.write().write_all(&outdata)?;
        let r = buf.try_write();
        match r {
            Ok(mut m) => {
                let r = m.write(&outdata);
                match r {
                    Err(e) => println!("Error on write {:?}", e),
                    Ok(sz) => println!("Wrote {:?}", sz)
                }
            }
            Err(e) => println!("Lock error {:?}", e),
        }
        
        //let mut inbuf: ReadBuff =  vec![0; 200];
        let mut indata = vec![0; 200];
        buf.read().read_exact(&mut indata)?;
        let r1 = buf.try_read();   
        match r1 {
            Ok(mut m) => {
                //let r2 = m.read_buf_exact(&mut indata);
                let r2 = m.read(&mut indata);
                match r2 {
                    Ok(sz) => println!("Read {:?}", sz),
                    Err(e) => println!("Error on read {:?}", e),
                    
                }
            }
            Err(e) => println!("Lock error {:?}", e),
        }
    }
    

Actually scrap that. It's already done if I change the read buffer size. I think it will do nicely now so I thank you for the code .

This is either already how it is, or ir contradicts with your other requirements, depending on what you exactly mean (which I can't tell):

  • My implementation is already such that it tracks the number of initialized elements (.available()) and the number of uninitialized ones (.remaining()) separately. These are not the same, but they are mutually dependent, since they necessarily sum to the capacity (in fact that's how the write size is calculated). I don't see how they could be even more independent than that.
  • If you mean that you need to read and write an arbitrarily large amount of bytes at once, then that's only possible by allowing the buffer to reallocate. A ring buffer doesn't magically give you more/infinite memory. But one of your other requirements was that the buffer must be of a constant size, in which case this is simply impossible.

Maybe I explained it badly. The buffer is a closed ring and both read and write start at the same point in the buffer. The ring is a fixed size calculated to be enough to buffer data that is flowing through the system without usually causing data loss or imposing a noticeable delay. Data is written in chunks of around 750 bytes as it comes off the hardware, the write frequency is quite high. Its pulled out of the ring in much larger chunks of several K for processing. The processing of course takes somewhat longer than the write cycles of the incoming data. However, incoming data should always be able to be written otherwise data is dropped (bad). Reading the data expects to wait until enough is available for processing otherwise processing is probably taking too long.

That's understandable, but how do you reconcile that with the requirement of the fixed-size buffer? Especially if readers are actively throttled, you can't really guarantee that all incoming data fit inside a fixed amount of space.

Its a balancing act. This is a radio implementation which I wrote in C some years ago. The UDP data to the radio has various info including the frequency. The frequency can be changed rapidly and the received audio should follow closely enough that there is minimal lag so the buffer has to be sized such that if read and write were separated by the maximum amount the lag is acceptable. If there is some hiccup in the processing it's much better to drop some data which will just cause an annoying click in the audio than increase the buffer and therefore the lag and probably never catch up.