I have a reverse proxy developed basing on hyper v.0.13 and tokio v0.2, recently I am encountering potential performance issue and trying to improve it by introducing tokio::io::BufStream
in upstreams.
Here is the breif code of the server
pub async fn run(port:u16, context : HttpRuntimeContext) -> Result<(), hyper::error::Error> {
let make_service = make_service_fn(move |_| {
let context = context.clone();
async move {
Ok::<_, hyper::Error>(service_fn( move |mut req| {
let context = context.clone();
async move {
//...
let client = get_client(&addr);
client.request(req).await
}
}))
}
});
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let server = Server::bind(&addr)
.http1_max_buf_size(1024*1024)
.tcp_keepalive(Some(Duration::from_secs(300)))
.serve(make_service);
server.await?;
Ok(())
}
// pooling connections
lazy_static! {
static ref HTTP_CLIENTS : RwLock<HashMap<String, Arc<Client<UpstreamConnector>>>> = RwLock::new(HashMap::new());
}
/// get or create an hyper::Client for target server address
fn get_client(server_addr : &str) -> Arc<Client<UpstreamConnector>> {
{
let reader = HTTP_CLIENTS.read().unwrap();
if let Some(client) = reader.get(server_addr) {
return client.clone();
}
}
let connector = UpstreamConnector::new(&server_addr);
let client = Client::builder()
.pool_idle_timeout(Duration::from_secs(30))
.pool_max_idle_per_host(25)
.build(connector);
let client = Arc::new(client);
{
let mut writer = HTTP_CLIENTS.write().unwrap();
writer.insert( server_addr.to_string(), client.clone());
}
client
}
UpstreamConnector
is used to established a TCP connection to backend.
impl Service<Uri> for UpstreamConnector {
type Response = BufferedTcpStream;
type Error = std::io::Error;
type Future = Pin<Box<
dyn Future<Output = Result<Self::Response, Self::Error>> + Send
>>;
fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// This connector is always ready, but others might not be.
Poll::Ready(Ok(()))
}
fn call(&mut self, _: Uri) -> Self::Future {
let addr = self.target.clone();
Box::pin(async move {
let stream = TcpStream::connect(addr).await?;
_ = stream.set_send_buffer_size(512*1024);
_ = stream.set_recv_buffer_size(512*1024);
return Ok(BufferedTcpStream(BufStream::with_capacity(1024*1024, 1024*1024, stream)));
})
}
}
The BufferedTcpStream
was tokio::io::TcpStream
, I replaced it with my own BufStream<TcpStream>
as below
use tokio::prelude::*;
use tokio::net::TcpStream;
use tokio::io::BufStream;
pub struct BufferedTcpStream(BufStream<TcpStream>);
impl AsyncRead for BufferedTcpStream {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [std::mem::MaybeUninit<u8>]) -> bool {
false
}
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(self.0.get_mut()).poll_read(cx, buf)
}
}
impl AsyncWrite for BufferedTcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(self.0.get_mut()).poll_write(cx, buf)
}
fn poll_write_buf<B: Buf>(
mut self: Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
Pin::new(self.0.get_mut()).poll_write_buf(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<io::Result<()>> {
Pin::new(self.0.get_mut()).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<io::Result<()>> {
Pin::new(self.0.get_mut()).poll_shutdown(cx)
}
}
impl hyper::client::connect::Connection for BufferedTcpStream {
fn connected(&self) -> Connected {
self.0.get_ref().connected()
}
}
Questions :
- Any glaring issue?
- Should I set the hyper::client::executor to use tokio's executor? I already have
#[tokio::main(threaded_scheduler)]
inmain
- Is it nessecary to wrap
TcpStream
withBufStream
while I already configured a big send/recv buffer on socket? Orhyper::Client
internally implemented buffer already? - The buffer size of
BufStream
should be bigger than the buffer ofTcpStream
? - In the bottom are the calling logs of those methods fired by
hyper::Client
. It seemspoll_flush
is called frequently. I checked the source code and seemspoll_flush
is to load the reader buffer rather than flushing the writer buffer?
01-15 10:52:41 [src\upstream.rs:99] connected
01-15 10:52:41 [src\upstream.rs:61] poll_read
01-15 10:52:41 [src\upstream.rs:86] poll_flush
01-15 10:52:41 [src\upstream.rs:61] poll_read
01-15 10:52:41 [src\upstream.rs:81] poll_write_buf
01-15 10:52:41 [src\upstream.rs:86] poll_flush
01-15 10:52:42 [src\upstream.rs:61] poll_read
01-15 10:52:42 [src\upstream.rs:61] poll_read
01-15 10:52:42 [src\upstream.rs:61] poll_read
01-15 10:52:42 [src\upstream.rs:86] poll_flush
01-15 10:52:42 [src\upstream.rs:61] poll_read
01-15 10:52:42 [src\upstream.rs:86] poll_flush
01-15 10:52:57 [src\upstream.rs:61] poll_read
01-15 10:52:57 [src\upstream.rs:81] poll_write_buf
01-15 10:52:57 [src\upstream.rs:86] poll_flush
01-15 10:52:57 [src\upstream.rs:61] poll_read
01-15 10:52:57 [src\upstream.rs:61] poll_read
01-15 10:52:57 [src\upstream.rs:61] poll_read
01-15 10:52:57 [src\upstream.rs:86] poll_flush
01-15 10:52:57 [src\upstream.rs:61] poll_read
01-15 10:52:57 [src\upstream.rs:86] poll_flush
01-15 10:53:07 [src\upstream.rs:99] connected
01-15 10:53:07 [src\upstream.rs:61] poll_read
01-15 10:53:07 [src\upstream.rs:86] poll_flush
01-15 10:53:07 [src\upstream.rs:61] poll_read
01-15 10:53:07 [src\upstream.rs:81] poll_write_buf
01-15 10:53:07 [src\upstream.rs:86] poll_flush
01-15 10:53:08 [src\upstream.rs:61] poll_read
01-15 10:53:08 [src\upstream.rs:86] poll_flush
01-15 10:53:08 [src\upstream.rs:61] poll_read
01-15 10:53:08 [src\upstream.rs:86] poll_flush
01-15 10:53:08 [src\upstream.rs:61] poll_read
01-15 10:53:08 [src\upstream.rs:86] poll_flush
01-15 10:53:08 [src\upstream.rs:61] poll_read
01-15 10:53:08 [src\upstream.rs:61] poll_read
01-15 10:53:08 [src\upstream.rs:61] poll_read
01-15 10:53:08 [src\upstream.rs:86] poll_flush
01-15 10:53:08 [src\upstream.rs:61] poll_read
01-15 10:53:08 [src\upstream.rs:86] poll_flush
01-15 10:53:08 [src\upstream.rs:61] poll_read
01-15 10:53:08 [src\upstream.rs:86] poll_flush
01-15 10:53:11 [src\upstream.rs:61] poll_read
01-15 10:53:11 [src\upstream.rs:81] poll_write_buf
01-15 10:53:11 [src\upstream.rs:86] poll_flush
01-15 10:53:11 [src\upstream.rs:61] poll_read
01-15 10:53:11 [src\upstream.rs:86] poll_flush
01-15 10:53:11 [src\upstream.rs:61] poll_read
01-15 10:53:11 [src\upstream.rs:86] poll_flush
01-15 10:53:11 [src\upstream.rs:61] poll_read
01-15 10:53:11 [src\upstream.rs:61] poll_read
01-15 10:53:11 [src\upstream.rs:61] poll_read
01-15 10:53:11 [src\upstream.rs:86] poll_flush
01-15 10:53:11 [src\upstream.rs:61] poll_read
01-15 10:53:11 [src\upstream.rs:86] poll_flush
01-15 10:53:11 [src\upstream.rs:61] poll_read
01-15 10:53:11 [src\upstream.rs:86] poll_flush
01-15 10:53:42 [src\upstream.rs:61] poll_read
01-15 10:53:42 [src\upstream.rs:86] poll_flush
01-15 10:53:42 [src\upstream.rs:91] poll_shutdown
01-15 10:54:08 [src\upstream.rs:61] poll_read
01-15 10:54:08 [src\upstream.rs:86] poll_flush
01-15 10:54:08 [src\upstream.rs:91] poll_shutdown
01-15 10:55:17 [src\upstream.rs:99] connected
01-15 10:55:17 [src\upstream.rs:61] poll_read
01-15 10:55:17 [src\upstream.rs:86] poll_flush
01-15 10:55:17 [src\upstream.rs:61] poll_read
01-15 10:55:17 [src\upstream.rs:81] poll_write_buf
01-15 10:55:17 [src\upstream.rs:86] poll_flush
01-15 10:55:17 [src\upstream.rs:61] poll_read
01-15 10:55:17 [src\upstream.rs:61] poll_read
01-15 10:55:17 [src\upstream.rs:61] poll_read
01-15 10:55:17 [src\upstream.rs:86] poll_flush
01-15 10:55:17 [src\upstream.rs:61] poll_read
01-15 10:55:17 [src\upstream.rs:86] poll_flush
01-15 10:55:17 [src\upstream.rs:61] poll_read
01-15 10:55:17 [src\upstream.rs:81] poll_write_buf
01-15 10:55:17 [src\upstream.rs:86] poll_flush
01-15 10:55:18 [src\upstream.rs:61] poll_read
01-15 10:55:18 [src\upstream.rs:61] poll_read
01-15 10:55:18 [src\upstream.rs:61] poll_read
01-15 10:55:18 [src\upstream.rs:86] poll_flush
01-15 10:55:18 [src\upstream.rs:61] poll_read
01-15 10:55:18 [src\upstream.rs:86] poll_flush
01-15 10:55:18 [src\upstream.rs:61] poll_read
01-15 10:55:18 [src\upstream.rs:86] poll_flush
01-15 10:55:20 [src\upstream.rs:61] poll_read
01-15 10:55:20 [src\upstream.rs:81] poll_write_buf
01-15 10:55:20 [src\upstream.rs:86] poll_flush
01-15 10:55:20 [src\upstream.rs:61] poll_read
01-15 10:55:20 [src\upstream.rs:61] poll_read
01-15 10:55:20 [src\upstream.rs:61] poll_read
01-15 10:55:20 [src\upstream.rs:86] poll_flush
01-15 10:55:20 [src\upstream.rs:61] poll_read
01-15 10:55:20 [src\upstream.rs:86] poll_flush
01-15 10:55:20 [src\upstream.rs:61] poll_read
01-15 10:55:20 [src\upstream.rs:81] poll_write_buf
01-15 10:55:20 [src\upstream.rs:86] poll_flush
01-15 10:55:21 [src\upstream.rs:61] poll_read
01-15 10:55:21 [src\upstream.rs:61] poll_read
01-15 10:55:21 [src\upstream.rs:61] poll_read
01-15 10:55:21 [src\upstream.rs:86] poll_flush
01-15 10:55:21 [src\upstream.rs:61] poll_read
01-15 10:55:21 [src\upstream.rs:86] poll_flush
01-15 10:55:21 [src\upstream.rs:61] poll_read
01-15 10:55:21 [src\upstream.rs:86] poll_flush
01-15 10:55:22 [src\upstream.rs:61] poll_read
01-15 10:55:22 [src\upstream.rs:81] poll_write_buf
01-15 10:55:22 [src\upstream.rs:86] poll_flush
01-15 10:55:22 [src\upstream.rs:61] poll_read
01-15 10:55:22 [src\upstream.rs:61] poll_read
01-15 10:55:22 [src\upstream.rs:61] poll_read
01-15 10:55:22 [src\upstream.rs:86] poll_flush
01-15 10:55:22 [src\upstream.rs:61] poll_read
01-15 10:55:22 [src\upstream.rs:86] poll_flush
01-15 10:55:22 [src\upstream.rs:61] poll_read
01-15 10:55:22 [src\upstream.rs:81] poll_write_buf
01-15 10:55:22 [src\upstream.rs:86] poll_flush
01-15 10:55:22 [src\upstream.rs:61] poll_read
01-15 10:55:22 [src\upstream.rs:61] poll_read
01-15 10:55:22 [src\upstream.rs:61] poll_read
01-15 10:55:22 [src\upstream.rs:86] poll_flush
01-15 10:55:22 [src\upstream.rs:61] poll_read
01-15 10:55:22 [src\upstream.rs:86] poll_flush
01-15 10:55:22 [src\upstream.rs:61] poll_read
01-15 10:55:22 [src\upstream.rs:86] poll_flush
01-15 10:55:24 [src\upstream.rs:61] poll_read
01-15 10:55:24 [src\upstream.rs:81] poll_write_buf
01-15 10:55:24 [src\upstream.rs:86] poll_flush
01-15 10:55:24 [src\upstream.rs:61] poll_read
01-15 10:55:24 [src\upstream.rs:61] poll_read
01-15 10:55:24 [src\upstream.rs:61] poll_read
01-15 10:55:24 [src\upstream.rs:86] poll_flush
01-15 10:55:24 [src\upstream.rs:61] poll_read
01-15 10:55:24 [src\upstream.rs:86] poll_flush
01-15 10:55:24 [src\upstream.rs:61] poll_read
01-15 10:55:24 [src\upstream.rs:81] poll_write_buf
01-15 10:55:24 [src\upstream.rs:86] poll_flush
01-15 10:55:44 [src\upstream.rs:61] poll_read
01-15 10:55:44 [src\upstream.rs:86] poll_flush
01-15 10:55:44 [src\upstream.rs:61] poll_read
01-15 10:55:44 [src\upstream.rs:86] poll_flush
01-15 10:55:44 [src\upstream.rs:61] poll_read
01-15 10:55:44 [src\upstream.rs:86] poll_flush
01-15 10:55:44 [src\upstream.rs:61] poll_read
01-15 10:55:44 [src\upstream.rs:86] poll_flush
01-15 10:55:44 [src\upstream.rs:61] poll_read
01-15 10:55:44 [src\upstream.rs:61] poll_read
01-15 10:55:44 [src\upstream.rs:61] poll_read
01-15 10:55:44 [src\upstream.rs:86] poll_flush
01-15 10:55:44 [src\upstream.rs:61] poll_read
01-15 10:55:44 [src\upstream.rs:86] poll_flush
01-15 10:55:44 [src\upstream.rs:61] poll_read
01-15 10:55:44 [src\upstream.rs:86] poll_flush