Buffering writes for a pipelined request-response pattern

Hi everyone, I'm new to Rust. I'm trying to write a TCP Proxy as part of learning the language and ecosystem, and it's been great so far. However, I've hit a stumbling block when trying to implement pipelining into the proxy.

The idea is that each client can pipeline any arbitrary number of requests in one connection, and the proxy will forward those to the upstream server, and write back the responses to the client as it receives them. My current implementation is as such:

async fn handle_conn<R, W>(
    mut req_stream: FramedRead<R, ReqDecoder>,
    mut res_writer: W,
    upstream: Arc<UpstreamServer>,
) -> Result<(), Error>
where
    R: AsyncRead + Unpin,
    W: AsyncWrite + Unpin,
{
    loop {
        match req_stream.next().await {
            Some(Ok(request)) => {
                let response: Bytes = upstream.handle(request);
                res_writer.write(response.as_ref()).await?;
            }
            ...
            None => {
                info!("No more requests");
                return Ok(())
            }
        }
    }
}

This implementation decodes a request, does processing, then writes back the response before decoding the next request. Not a problem for singular requests, but when the client pipelines a large number of them, the many awaits on write degrade performance. I can't do read_to_end to process everything and flush all responses back at once either, since the client won't close the conn until it receives responses for all its requests or an error.

What I want is something more akin to:

async fn handle_conn<R, W>(
    mut req_stream: FramedRead<R, ReqDecoder>,
    mut res_writer: W,
    server: Arc<Server>,
) -> Result<(), Error>
where
    R: AsyncRead + Unpin,
    W: AsyncWrite + Unpin,
{
    // Imaginary type that returns ready when there is data to be written AND the underlying
    // writer is ready to write
    let res_buf = BufWriter::new(res_writer); 
    loop {
        select! {
            request = req_stream.next() => {
                Some(Ok(request)) => {
                    let response: Bytes = server.handle(request);
                    res_buf.write(response.as_ref()).await?;
                }
                ...
                None => {
                    info!("No more requests");
                    break;
                }
            }
            // Shouldn't be selected unless there was a previous call to res_buf.write()
            written = res_buf.flush() => {
                if written == 0 {
                    break;
                }
            }
        }
    }
    res_buf.flush().await?;
}

Is this something that is possible, or am I thinking about this wrongly? I thought of implementing my own custom BufWriter and future to achieve this, but I'm not sure if there is already something out there that accomplishes what I would assume is a common pattern?

Thanks a lot in advance and sorry if there are already resources out there that address this! I've been scouring the net for days and found some promising leads but nothing seems to exactly fit what I need here.

This seems like the perfect use case for the stream_reader, which is relatively rarely used!

use tokio::io::{stream_reader, StreamExt};

// This creates a `Stream<Item = Bytes>`
// The outer call is `StreamExt::map` and the inner is `Result::map`
let stream = req_stream.map(move |res| res.map(|req| upstream.handle(request)));

// Turn it into an `AsyncRead`.
let reader = stream_reader(stream);

// Copy data from the `AsyncRead` to the `AsyncWrite`
tokio::io::copy(reader, res_writer).await;

The reason this should work is that copy will read and write concurrently unless its internal 2048 byte buffer is filled, in which case it will pause reading.

1 Like

That's awesome, it works perfectly! To be honest, the FP style .map, .and_then, and other stuff really confuses me a lot and I don't think I would've gotten the solution without your help. Thank you so much!

In that case you might like the async-stream crate.

use async_stream::try_stream;

let stream = try_stream! {
    while let Some(item) = req_stream.next().await {
        let response: Bytes = server.handle(item?);
        yield response;
    }
};
let reader = stream_reader(stream);
tokio::pin!(reader);

tokio::io::copy(&mut reader, &mut res_writer).await;

It will introduce some tokio::pin!s you wouldn't need otherwise, but it's pretty cool. I didn't test it, but it should be rather close to the truth.

Oh that's awesome, it looks much more understandable to me!

I originally had another question about how we might try to implement zero-copy from upstream to client, but I think that it is impossible if I want to concurrently process read -> handle and response -> write. Since we would have to call write for each disjoint byte slice, which is how we receive them from the upstream server.

Once again, thank you so much for your help!

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.