Shared buffer crate

I am looking for a struct that can be shared between two threads. The one thread will use std::io::Write to write data to the buffer while the other thread will use std::io::Read to read the data written by the first thread.

I've been searching for a few hours now, but can't find a crate which currently does this. Am I overlooking any crates?

I think ringbuf - Rust is what you need

2 Likes

A lot of other languages use the term "pipe" for this. For example, Go's standard library has the io.Pipe() function which returns a reader and a writer.

The pipe crate seems to provide something similar. Skimming the API docs, it looks like it's implemented by transferring Vec<u8>'s from the sender to the receiver via a channel.

The difference between pipe and ringbuffer seems to be that ringbuffer very deliberately makes the capacity fixed, whereas pipe seems to give you less control over how much memory is used and should enable backpressure (i.e. imagine your reader can't read data as fast as it's being written, so the writer goes to sleep until it's able to write again).

1 Like

If you happen to be non-Windows, UnixStream::pair() might be helpful:

use std::{
    io,
    io::{BufRead, BufReader, Write},
    os::unix::net::UnixStream,
    thread,
    time::Duration,
};

fn main() -> io::Result<()> {
    let mut ht = Vec::new();

    // Sorry, not for Windows
    let (receiver, mut sender) = UnixStream::pair()?;

    // Reader
    ht.push(thread::spawn(move || -> io::Result<()> {
        let mut x = String::new();
        let mut rdr = BufReader::new(receiver);
        loop {
            x.clear();
            rdr.read_line(&mut x)?;
            let l = x.trim();
            if l.is_empty() {
                println!("RCV: End of data");
                return Ok(());
            }
            println!("RCV: '{l}'");
        }
    }));

    // Writer
    ht.push(thread::spawn(move || -> io::Result<()> {
        for x in 1 .. 10 {
            println!("SND: 'DATA{x:02}'");
            writeln!(sender, "DATA{x:02}")?;
            thread::sleep(Duration::from_millis(100));
        }
        println!("SND: End of data");
        writeln!(sender, "")?;
        Ok(())
    }));

    // Collect threads
    for h in ht {
        match h.join().unwrap() {
            Ok(_) => {}
            Err(e) => {
                println!("Terminated with error: {}", e);
            }
        }
    }
    Ok(())
}

I haven't seen any mention of channels. The easiest example to point to is the ones in stdlib, but crossbeam has some pretty fast ones.

The nice thing about channels is that you can use them to pretty much send any value of type T: Send, which include eg Vec<u8> and String.

pipe ended up doing it for me. In my case it works better than ringbuf for two reasons:

  1. ringbuf panics when it is full so I have to oversize it
  2. ringbuf's read() sometimes errors with WouldBlock which I would have to work around by manually sleeping to wait for enough data to be written. This seems to be related to how the library that uses the Reader is written.

My goto would have been channels. Unfortunately, I have to connect two libraries (cargo and cargo_metadata) which expect a Write and Read respectively.

Thanks for all the input everyone!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.