Concurrent read writer

Hi!

This is my first time posting here, so I hope this is the right place to get help.

I am trying to write a type that implements Read and Write. Internally, this type manages a fixed-length buffer where writes/reads go to/from. The idea is that I can have two threads - a reader and a writer - accessing this buffer simultaneously; if the buffer is empty and one thread tries to read, then it blocks until the writer thread puts some bytes in the buffer. Similarly, if the writer tries to write but there's no space left in the buffer it blocks until the reader drains some bytes from the buffer. This synchronization is achieved through a couple of condition variables plus a mutex. Additionally, there's some looping in there. Say the buffer is 100 bytes long, but I want to write 200. Then, the writer can write 100 bytes, wait for the reader to read them, and then it can write the next 100 bytes. This is done without the caller doing anything fancy; all it knows is that it wrote 200 bytes.

I am able to implement this type in, say, Golang, just fine: one thread can be inside the read() method while the other is inside the write() method and things work nicely, because those methods know how to synchronize access to the shared buffer and how to loop to achieve the semantics I described above. However, in Rust, I'm hitting a wall.

The thing is, Read and Write require &mut self, which means (as far as I understand things; I'm pretty new to Rust) that I need to wrap it with a Mutex. The problem is that, if one thread is holding the mutex in order to call, say, read(), then the other thread will never be able to call write() until the first one releases the lock. Then, if I cannot have the two threads "inside" the IO methods at the same time, then the looping semantics above would have to be pushed to the client code, and I don't want this.

Hopefully this makes sense and someone here can provide some pointers!

1 Like

you can check crates like ringbuffer-spsc or spsc-bip-buffer for inspiration.

the trick here is to split the states into different types (as opposed to putting all states into a single monolithic type). e.g. one type for the reader, one type for the writer, and one type for shared states. typically the reader and writer are just wrapper types of reference to the shared states, you can use borrowed references (usable for structured concurrency like std::thread::scope), or use Arc which can be thought of kind of "owned" references.

the API possibly looks like:

// the shared states
struct Buffer {
    //...
}
// scoped (borrowed) reader and writer
struct ScopedReader<'a> (&'a Buffer);
// owned reader and writer
struct Reader(Arc<Buffer>);

impl Buffer {
    pub fn new() -> Self { todo!() }
    pub fn split_scoped(&mut self) -> (ScopedReader<'_>, ScopedWriter<'_>) { todo!() }
    pub fn split_owned(self) -> (Reader, Writer) { todo!() }
    // actual implementation, note `&self` instead of `&mut self`
    // these might be kept private and expose public API through writer and reader
    // also these don't need to block, instead let the reader and writer choose what
    // policy to use when buffer is full or empty, e.g. spin, condvar, async, etc
    fn read(&self, buf: &mut [u8]) -> usize { todo!() }
    fn write(&self, buf: &[u8]) -> usize { todo!() }
}

// implement the trait on the reader type instead of the buffer type
// same for the writer type
impl Read for Reader {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
       todo!()
    }
}

note the synchronization machenism when writing to full buffer (or reading from empty buffer) is orthogonal to the buffer reading and writing, you can implement that separately (and combine the two in the reader and writer), or you can bake the park/unpark into the buffer itself.

also note, because the actual backing storage must be shared between the reader and writer, some form of interior mutability or unsafe code is unavoidable. the trade off is mainly safety/maintainability vs performance. also make sure to use the right synchronization primitives too. for a single producer single consumer queue, it is enough to use AtomicUsize, but you do need to understand the atomic memory order semantics.

3 Likes

Wow, I would have never thought of such a thing. Thanks so much for the reply, this was very enlightening.

I have a few follow-ups, though.

First: my knowledge about Rust is limited to the O'Reilly book, which came out int 2018, IIRC, so it looks like my understanding is pretty outdated. Do you have any suggestions for how to keep up to date with things like scoped threads - which your post is the first time I've heard about them -?

Second: let me see if I follow all this. What you're saying is that the Buffer type can implement read and write methods which don't require &mut self. Then, I can get a pair of reader/writer (either owned or scoped), which presumably have a reference to the Buffer, so they can call the read and write methods with that reference. Since I'm writing and reading to the same memory, then I need to synchronize things somehow. My first instinct would have been to use the usual stuff (Arc + Mutex and Condvar), but you suggest using atomics. I am familiar with atomics, but I'm not sure how they would be useful here. Would you mind elaborating? And, also, if anything I've said above is incorrect/inaccurate, I would greatly appreciate the correction :grin:

The official Rust Blog discusses major features in each release. For example, here is the post introducing scoped threads.

as pointed out by @kpreid above, there will be announcements on the official rust blog, there's a rss feed you can subscribe to. you can always check the RELEASES file in the repo if you want to.

that's correct. also note the shared Buffer type doesn't need to implement the same signature of read() and write() methods as Reader and Writer. e.g. it can implement some (maybe unsafe) primitives as building blocks for Reader and Writer , so the same Buffer can potentially support different flavors of Reader/Writer pairs such as blocking and async.

use atomics in single producer single consumer concurrent queues is a well established technique I would say, not just in rust. you will get a ton of results if you google use keywords like spsc, ringbuffer, lockfree, concurrent, etc. the core idea is we observe that, since it's single producer single consumer, the head and tail indices each is written by a single thread (no write-write race condition), you just need to use the correct memory order when load and update the atomic indices (the compiler will insert proper memory barriers as necessary) to ensure cache coherency and the data gets properly published.

just be careful to avoid UB when implementing in rust. for example, when writing data through a mutable pointer (ptr::write(), slice::from_raw_parts_mut(), etc), the pointer must be obtained from a writable location (provenance). you can either get a mutable pointer before splitting the buffer into reader and writer pair, like how spsc-bip-buffer is implemented:

or you can put the backing storage behind a UnsafeCell, like how ringbuffer-spsc Is implemented:

1 Like

as pointed out by @kpreid above, there will be announcements on the official rust blog, there's a rss feed you can subscribe to. you can always check the RELEASES file in the repo if you want to.

Alternatively, if you're like me and aren't great at keeping up with blog posts/RSS feeds, I find the This Week in Rust newsletter to be a good read and it always points to the new release blog post when it comes out.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.