Ring buffer implementations

Ok, then dropping data is acceptable, after all? In that case, you can just do this in a loop:

  • try to read as much data as possible
    • if it fails, then the buffer is empty
  • try to write data
    • if it fails, then the incoming packet is bigger than the buffer capacity, so drop it

I think you might be interested in using a Channel for this particular purpose.

Since all you seem to want is a push/pop operation, you can:

  • Safely send Reciever and Sender across threads.
  • Create a channel for u8.
  • Use bounded() to create a channel with limited capacity.
  • Unfortunately not be able to read a block of bytes :frowning: at a time. But something like my_vec.extend(recv.try_iter()) would achieve a similar result.
  • You can query .len() for number of messages in channel, and either remember capacity yourself or query .capacity().

Note you cannot change the capacity of this object, unlike a ring buffer under a mutex.

2 Likes

Thanks for the suggestions. Way back I tried using shared arrays (in C) but the management was difficult. I then found a ring buffer implementation and it was way easier. I will be using about 3 ring buffers between different parts of the application with a variety of read/write sizes. I use channels as well but for control and command messages rather than data. I'm still not convinced I will get to the end of this application as there are two further major parts to do. One is FFI to a C library that does all the signal processing and there is a whole load of GUI work which requires some pretty hairy graphics. It's interesting though and will keep me out of trouble over the Winter!

Thanks for the implementation. I've integrated that into my application and its working great. As the data throughput is high I had to precisely synchronise the threads to avoid either end getting bounced using a Condvar. I couldn't tolerate even a microsecond sleep on the writer side and the reader side had to be kicked at exactly the right time as even an instruction early and it would occasionally fail to get a write lock. Next up is FFI so that will be interesting.

I ran into a slight problem. My write size was 1008 bytes and my read size was 16384 bytes. The way I had it using try_write() and try_read() was holding the read lock too long causing the write to get sporadic 'WouldBlock' errors forcing the block to be skipped. I changed it to just write and read without taking locks which is the way I think ring buffers should work as you should never be overlapping. I wonder why Rust lets me get away with that and under what circumstances would you need to take a lock.
It works without dropping any blocks now and I get data back from my signal processing C lib so it all looks good so far.

I thought you were using multiple threads for reading and writing? That's concurrent modification of the buffer, so it must be synchronized.

If you aren't locking, and your code compiles, this can mean two things:

  • you aren't actually accessing the buffer from multiple threads, or
  • you are using unsafe to work around having to lock it. That is wrong, because it is a race condition.

I'm not sure what you mean by "Rust lets me get away with [it]". Get away with what, exactly? What do you think should or shouldn't happen?

Yes, reading and writing is different threads and I'm not using unsafe and it compiles and runs without errors.
This is the thread that writes to the ring buffer in chunks of 1008 bytes.

let r = self.rb_iq.write().write(&vec_iq);
        match r {
            Err(e) => {
                println!("Write error on rb_iq, skipping block {:?}", e);
            }
            Ok(_sz) => {
                success = true;  
            }
        }

        if success {
            // Signal the pipeline that data is available
            let mut locked = self.iq_cond.0.lock().unwrap();
            *locked = true;
            self.iq_cond.1.notify_one();
        } 

This is the thread that reads the ring buffer in chunks of 16384 bytes.

let mut locked = self.iq_cond.0.lock().unwrap();
        let result = self.iq_cond.1.wait_timeout(locked, Duration::from_millis(100)).unwrap();
        locked = result.0;
        // Why were we woken?
        if *locked == true {
            // We were signaled so data available
            *locked = false;
            if self.rb_iq.read().available() >= (common_defs::DSP_BLK_SZ * common_defs::BYTES_PER_SAMPLE) as usize {
                let iq_data = self.rb_iq.read().read(&mut self.iq_data);
                match iq_data {
                    Ok(_sz) => {
                        action = ACTIONS::ActionData;
                        //println!("Read {:?} bytes from rb_iq", _sz);
                    }
                    Err(e) => println!("Read error on rb_iq {:?}. Skipping cycle.", e),
                }
            }
        } else {
...

I think there is enough code there to get the idea. If I have to use the Mutex lock then Rust is too slow for this application unfortunately as the time to read a block causes the next write to fail often enough to be a problem.

But you are locking here: the .read() and .write() methods of the ring buffer perform the locking.

OK. So what is the purpose of the try_read() and try_write() which I was using before doing the read() and write(). It was the try_write() that was coming back with 'Would Block' as I guess try_read() was still holding a lock.

The try_xxx() methods return an error if they can't immediately acquire the lock, while read() and write() block (wait) until the lock is available.

Thanks. I understand what's happening now.

This whole thread is a bit frustrating to read. Lock-free, fixed capacity ring buffers are a well known solution to communication between threads in single producer single consumer scenarios, and yet most of this thread is about using VecDeque, a container which is simply not capable of being used this way.

Find a dedicated lock-free ring bufffer crate. Here's one used internally by crossbeam.

Features I see relevant to the discussion are:

3 Likes

Thanks for the heads-up. I'm only starting out so just asked for advice here. Yes I agree ring buffers should be lock free as is the one I use in the C code I'm porting to Rust. I will certainly look at that once I get to a stable point in the application.

I have written an example of code using the ringbuf crate after figuring out its API.

Here's an example of a simple producer and consumer.

Add to Cargo.toml

[dependencies]
ringbuf = "0.3"
crossbeam = "0.8"

src/main.rs

use std::io::{self, Read, Write};
use std::time::Duration;

type MyRingBuf = ringbuf::StaticRb<u8, 4096>;

const WRITE_INTERVAL: Duration = Duration::from_micros(10);
const READ_INTERVAL: Duration = Duration::from_micros(200);

fn main() {
    let mut ringbuf = MyRingBuf::default();
    let (producer, consumer) = ringbuf.split_ref();

    crossbeam::scope(|scope| {
        scope.spawn(move |_| producer_thread(producer));
        scope.spawn(move |_| consumer_thread(consumer));
    }).unwrap();
}

fn producer_thread(mut producer: ringbuf::Producer<u8, &MyRingBuf>) {
    loop {
        match producer.write_all(b"Hello!") {
            Ok(()) => {},
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => panic!("consumer isn't reading fast enough!"),
            Err(e) => panic!("{e}"),
        }
        std::thread::sleep(WRITE_INTERVAL);
    }
}

fn consumer_thread(mut consumer: ringbuf::Consumer<u8, &MyRingBuf>) {
    let mut buffer = [0; 256];
    loop {
        let num_read = match consumer.read(&mut buffer) {
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
            Err(e) => panic!("{e}"),
            // FIXME: ringbuf::Consumer never produces the Ok(0) EOF signal since the channel can't be "closed".
            //        You may need an additional communication channel to tell the consumer thread when to stop.
            Ok(0) => unreachable!(),
            Ok(num_read) => num_read,
        };
        let ascii_bytes = &buffer[..num_read];
        // arbitrary utf-8 could end mid-character, but ascii won't
        let ascii = std::str::from_utf8(ascii_bytes).expect("not UTF-8");
        print!("@@@@{ascii}");
        std::thread::sleep(READ_INTERVAL);
    }
}

This is a pretty rough example, panicking as soon as writing would block. The point though is to demonstrate that it really does not block, so long as the reader is able to keep up. On my PC this keeps printing things and running continuously, and never hits the panic in the producer. (if, on the other hand, I add a bit of code to lock a Mutex during the read, and then try_lock().unwrap() the mutex during the write, then eventually it panics with WouldBlock)

As noted in a comment, Producer doesn't have e.g. a close() method, nor does it do anything to the ring buffer to signal EOF when it is dropped, so you may need another communication channel (e.g. some AtomicBool) to tell the consumer when to stop.

Thanks, saved away for a revisit.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.