See the docs here: pipebuf - Rust
At work we needed a more efficient way to handle processing chains, e.g. TCP stream ↔ TLS ↔ Websocket-handler, or Protocol-encoder → Compressor → Stream-encapsulator, etc. The problem is that both Read and Write typically result in copying data. PipeBuf provides a buffer that is shared between both the consumer and producer of data, so the data doesn't have to be copied. We're already seeing efficiency gains from using this. The consumer can use it as its input buffer, i.e. leave stuff in there until it has a full record / line / block. So the consumer doesn't need its own private buffer. The semantics of PipeBuf are modelled after TCP, i.e. abort and close are different, and there is a "push" flag that some consumers may make use of. There are interfaces to/from Read and Write for backward compatibility. The API is designed to eliminate as many foot-guns as possible, and to be very easy to code against.
To go with the core pipebuf crate, there are also wrapper crates that make it easier to use various other crates:
- pipebuf_rustls reduces all of the complexity of the Rustls API down to a single
process
call. - pipebuf_mio supports connecting PipeBufs up to a MIO TCP stream or UNIX socket stream
- pipebuf_websocket provides a simplified wrapper around embedded-websocket
This is the first open-source release of PipeBuf. We're using it internally at work, and it has tests there, but I need to add a lot of tests to the crate itself, and some examples. The core pipebuf crate API can be considered pretty stable now, though, since it has already been through a couple of iterations. However there are lots of bits missing from the wrapper crates, e.g. client-side WS or TLS handling, and tests/examples. There are also a couple more wrapper crates I plan to get out when I can.
I'd really like to see an ecosystem build around this. It should be easy for anyone to write a wrapper crate, e.g. typically 100 LOC to wrap something. Also, if low-level crates like Rustls or whatever could adopt this interface or a compatible slice-based interface (see notes in the docs), that could really save some cycles on all that copying.
This could save a lot of work for some projects too. For example, tiny_http
had to add Rustls support, but last I checked they were still one version behind. They were originally limited to just TCP, but then someone needed UNIX sockets and got that added. So slowly it's becoming an "everything but the kitchen sink" situation. If instead tiny_http
was built around a core HTTP protocol handler using PipeBuf as an interface, then if someone needed UNIX sockets or a different version of Rustls, then they could easily put that together themselves. No hassle for tiny_http
devs! Also, if it had a PipeBuf-based interface, then someone could take that core protocol handling and put it into an event-driven system, or async/await, or actors or whatever. So its use wouldn't be limited to the default blocking handling it has right now.
So there is a lot of potential here if this catches people's interest. I've opened the Discussion section on the GitHub if anyone wants to discuss any ideas.
Here's some example code which handles a short WSS processing chain. All the complexity of interfacing to Rustls or the Websocket crate is handled by the wrapper crates and appears in the user code as just a process
or receive
call:
let mut activity = true;
while activity {
activity = false;
match self.tcplink.process(&mut self.source, self.crypt.lower()) {
Err(e) => return fail!(cx, "Websocket TCP connection failed: {}", e),
Ok(true) => activity = true,
Ok(false) => (),
}
match self.tls.process(self.crypt.upper(), self.plain.lower()) {
Err(e) => return fail!(cx, "Websocket TLS error: {}", e),
Ok(true) => activity = true,
Ok(false) => (),
}
match self.ws.receive(self.plain.upper(), self.in_msg.wr(), &mut self.in_is_text) {
Err(e) => return fail!(cx, "Websocket protocol error: {:?}", e),
Ok(true) => {
activity = true;
if self.in_msg.rd().is_eof() {
if self.in_is_text {
self.incoming_text(cx);
} else {
return fail!(cx, "Not expecting binary messages");
}
self.in_msg.reset();
}
}
Ok(false) => (),
}
}