Hi everyone, I'm new to Rust. I'm trying to write a TCP Proxy as part of learning the language and ecosystem, and it's been great so far. However, I've hit a stumbling block when trying to implement pipelining into the proxy.
The idea is that each client can pipeline any arbitrary number of requests in one connection, and the proxy will forward those to the upstream server, and write back the responses to the client as it receives them. My current implementation is as such:
async fn handle_conn<R, W>(
mut req_stream: FramedRead<R, ReqDecoder>,
mut res_writer: W,
upstream: Arc<UpstreamServer>,
) -> Result<(), Error>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
loop {
match req_stream.next().await {
Some(Ok(request)) => {
let response: Bytes = upstream.handle(request);
res_writer.write(response.as_ref()).await?;
}
...
None => {
info!("No more requests");
return Ok(())
}
}
}
}
This implementation decodes a request, does processing, then writes back the response before decoding the next request. Not a problem for singular requests, but when the client pipelines a large number of them, the many awaits on write degrade performance. I can't do read_to_end
to process everything and flush all responses back at once either, since the client won't close the conn until it receives responses for all its requests or an error.
What I want is something more akin to:
async fn handle_conn<R, W>(
mut req_stream: FramedRead<R, ReqDecoder>,
mut res_writer: W,
server: Arc<Server>,
) -> Result<(), Error>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
// Imaginary type that returns ready when there is data to be written AND the underlying
// writer is ready to write
let res_buf = BufWriter::new(res_writer);
loop {
select! {
request = req_stream.next() => {
Some(Ok(request)) => {
let response: Bytes = server.handle(request);
res_buf.write(response.as_ref()).await?;
}
...
None => {
info!("No more requests");
break;
}
}
// Shouldn't be selected unless there was a previous call to res_buf.write()
written = res_buf.flush() => {
if written == 0 {
break;
}
}
}
}
res_buf.flush().await?;
}
Is this something that is possible, or am I thinking about this wrongly? I thought of implementing my own custom BufWriter and future to achieve this, but I'm not sure if there is already something out there that accomplishes what I would assume is a common pattern?
Thanks a lot in advance and sorry if there are already resources out there that address this! I've been scouring the net for days and found some promising leads but nothing seems to exactly fit what I need here.