PipeBuf: a more efficient alternative to Read and Write

See the docs here: pipebuf - Rust

At work we needed a more efficient way to handle processing chains, e.g. TCP stream ↔ TLS ↔ Websocket-handler, or Protocol-encoder → Compressor → Stream-encapsulator, etc. The problem is that both Read and Write typically result in copying data. PipeBuf provides a buffer that is shared between both the consumer and producer of data, so the data doesn't have to be copied. We're already seeing efficiency gains from using this. The consumer can use it as its input buffer, i.e. leave stuff in there until it has a full record / line / block. So the consumer doesn't need its own private buffer. The semantics of PipeBuf are modelled after TCP, i.e. abort and close are different, and there is a "push" flag that some consumers may make use of. There are interfaces to/from Read and Write for backward compatibility. The API is designed to eliminate as many foot-guns as possible, and to be very easy to code against.

To go with the core pipebuf crate, there are also wrapper crates that make it easier to use various other crates:

  • pipebuf_rustls reduces all of the complexity of the Rustls API down to a single process call.
  • pipebuf_mio supports connecting PipeBufs up to a MIO TCP stream or UNIX socket stream
  • pipebuf_websocket provides a simplified wrapper around embedded-websocket

This is the first open-source release of PipeBuf. We're using it internally at work, and it has tests there, but I need to add a lot of tests to the crate itself, and some examples. The core pipebuf crate API can be considered pretty stable now, though, since it has already been through a couple of iterations. However there are lots of bits missing from the wrapper crates, e.g. client-side WS or TLS handling, and tests/examples. There are also a couple more wrapper crates I plan to get out when I can.

I'd really like to see an ecosystem build around this. It should be easy for anyone to write a wrapper crate, e.g. typically 100 LOC to wrap something. Also, if low-level crates like Rustls or whatever could adopt this interface or a compatible slice-based interface (see notes in the docs), that could really save some cycles on all that copying.

This could save a lot of work for some projects too. For example, tiny_http had to add Rustls support, but last I checked they were still one version behind. They were originally limited to just TCP, but then someone needed UNIX sockets and got that added. So slowly it's becoming an "everything but the kitchen sink" situation. If instead tiny_http was built around a core HTTP protocol handler using PipeBuf as an interface, then if someone needed UNIX sockets or a different version of Rustls, then they could easily put that together themselves. No hassle for tiny_http devs! Also, if it had a PipeBuf-based interface, then someone could take that core protocol handling and put it into an event-driven system, or async/await, or actors or whatever. So its use wouldn't be limited to the default blocking handling it has right now.

So there is a lot of potential here if this catches people's interest. I've opened the Discussion section on the GitHub if anyone wants to discuss any ideas.

Here's some example code which handles a short WSS processing chain. All the complexity of interfacing to Rustls or the Websocket crate is handled by the wrapper crates and appears in the user code as just a process or receive call:

let mut activity = true;
while activity {
    activity = false;
    match self.tcplink.process(&mut self.source, self.crypt.lower()) {
        Err(e) => return fail!(cx, "Websocket TCP connection failed: {}", e),
        Ok(true) => activity = true,
        Ok(false) => (),
    }
    match self.tls.process(self.crypt.upper(), self.plain.lower()) {
        Err(e) => return fail!(cx, "Websocket TLS error: {}", e),
        Ok(true) => activity = true,
        Ok(false) => (),
    }
    match self.ws.receive(self.plain.upper(), self.in_msg.wr(), &mut self.in_is_text) {
        Err(e) => return fail!(cx, "Websocket protocol error: {:?}", e),
        Ok(true) => {
            activity = true;
            if self.in_msg.rd().is_eof() {
                if self.in_is_text {
                    self.incoming_text(cx);
                } else {
                    return fail!(cx, "Not expecting binary messages");
                }
                self.in_msg.reset();
            }
        }
        Ok(false) => (),
    }
}
5 Likes

It's sound very interesting! I have some questions:

  1. How does it work internally ?
  2. Any async/await support ?
  3. Does it worth it for this use-case, where I avoid cloning and the only chain between TCP and My protocol is TLS, but rustls use internal buffering ?

Internally it is a Vec with three segments:

  • already-processed data waiting to be discarded
  • data waiting to be consumed
  • free space available for a producer to write into

You could look at the code. It's not complicated. It's just a non-circular buffer that gets compacted from time to time. However the key isn't so much the nature of the buffer, but rather all the API around it and how it is designed to be used, i.e. being passed to a component's processing call via a reference rather than being owned by the component. Also a lot of care has gone into EOF handling, which ends up simplifying things for components and avoiding code duplication even though that isn't immediately obvious.

This is lower level than the runtime. It's more like a primitive. So it could work with blocking or non-blocking, event-driven, bare mio, actors, futures, async/await, bare metal, or whatever. So you could await an input source, and then move the data to a PipeBuf, then run your processing, and output the result, all under async/await. Or do something similar under any other kind of runtime.

Rustls right now always does internal buffering, and its input/output is via Read/Write. The pipebuf_rustls wrapper maps that onto PipeBufs. There is copying involved here, but no worse than any other way of interfacing to Rustls if you're going to have to buffer locally anyway to do your protocol handling. Using the wrapper does simplify the interface to TLS though. If you already have code to talk to Rustls, the only benefits might be in code clarity and composability, and it's up to you whether you consider that worth it.

1 Like

I expected pipebuf::PBufRd to implement AsyncRead or something like that.

This implementation could be hidden behind a feature flag or a separate crate like: async-pipebuf

... You could await an input source, and then move the data to a PipeBuf, then run your processing, and output the result, all under async/await. Or do something similar under any other kind of runtime.

Sound very complicated, is there any async example or something ?

I'm not sure that AsyncRead is what you want. The PipeBuf itself is just a buffer, like a Vec is just a variable array, and a HashMap is just a map. None of them knows where their data is coming from. So a PipeBuf can't schedule the operations required to fill itself, because you (the owner) are in charge of that.

Perhaps you could point me to an example of some async/await code handling streams of bytes and I could show how to fit a PipeBuf processing chain into that.

I'm not sure that AsyncRead is what you want.

I am asking for something like a wrapper, for example async-tls or tokio-rustls), which provide a interface to support async/await syntax.

A function take &mut impl AsyncRead, Is this function signature needs to change to support pipebuf ?

Looking at those wrappers it makes me realize how overcomplicated async/await has made everything. It's like trying to make everything fit async/await has broken people's ability to think about the problem. The tokio_rustls wrapper is not even full duplex according to the docs. The whole thing seems crazy, but I guess this is what people are using. Really for a chain like websocket ↔ TLS ↔ TCP, you need to handle input from two ends, both new websocket messages being sent and new data arriving from TCP -- at the same time, in any order, i.e. with no predictable ordering to allow you to handle things in a sequential way. So that's two events that need to wake things up and cause processing. (Actually maybe three if TCP writing has backpressure.) await only waits for one thing at a time. How you are going to structure waiting for two things? Is that select!? Or have two tasks each waiting for one thing and sharing a PipeBuf-based chain via Arc<Mutex<...>>? I guess I need to study current async/await idioms more before I can suggest anything. This isn't a limitation of PipeBuf, which can handle multiple wakeups and inputs from different directions, nor is it a limitation of other runtimes like actor-based or event-driven ones. It's not a limitation if you're handling mio Ready events directly either. It's just that async/await makes this so much harder. (I code against Stakker most of the time, so I don't have to worry about all of this.)

Anyway, several people have requested examples, so I'll put async/await on the list and see if I can create an example for that. In the meantime, if you want to explore this further, and maybe help me come up with a solution, we could start a discussion on the pipebuf Github (linked above). Or if anyone else can comment on how async/await or Tokio handles truly bidirectional streams (full-duplex) that might save us some time.