Should I use BufStream<TcpStream> in hyper::Client?

I have a reverse proxy developed basing on hyper v.0.13 and tokio v0.2, recently I am encountering potential performance issue and trying to improve it by introducing tokio::io::BufStream in upstreams.

Here is the breif code of the server


pub async fn run(port:u16, context : HttpRuntimeContext) -> Result<(), hyper::error::Error> {

    let make_service = make_service_fn(move |_| {
        let context = context.clone();
        async move {
            Ok::<_, hyper::Error>(service_fn( move |mut req| {
                let context = context.clone();
                async move {
                   //...
                    let client = get_client(&addr);
                    client.request(req).await
                }
            }))
        }
    });

    let addr = SocketAddr::from(([0, 0, 0, 0], port));
    let server = Server::bind(&addr)
        .http1_max_buf_size(1024*1024)
        .tcp_keepalive(Some(Duration::from_secs(300)))
        .serve(make_service);
    server.await?;

    Ok(())
}



// pooling connections
lazy_static! {
    static ref HTTP_CLIENTS : RwLock<HashMap<String, Arc<Client<UpstreamConnector>>>> = RwLock::new(HashMap::new());
}


/// get or create an hyper::Client for target server address
fn get_client(server_addr : &str) -> Arc<Client<UpstreamConnector>> {

    {
        let reader = HTTP_CLIENTS.read().unwrap();
        if let Some(client) = reader.get(server_addr) {
            return client.clone();
        }
    }

    let connector = UpstreamConnector::new(&server_addr);

    let client = Client::builder()
        .pool_idle_timeout(Duration::from_secs(30))
        .pool_max_idle_per_host(25)
        .build(connector);
    let client = Arc::new(client);

    {
        let mut writer = HTTP_CLIENTS.write().unwrap();
        writer.insert( server_addr.to_string(), client.clone());
    }

    client
}

UpstreamConnector is used to established a TCP connection to backend.

impl Service<Uri> for UpstreamConnector {
    type Response = BufferedTcpStream;
    type Error = std::io::Error;

    type Future = Pin<Box<
        dyn Future<Output = Result<Self::Response, Self::Error>> + Send
    >>;

    fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
        // This connector is always ready, but others might not be.
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, _: Uri) -> Self::Future {
        let addr = self.target.clone(); 
        Box::pin(async move {
            let stream = TcpStream::connect(addr).await?;
            _ = stream.set_send_buffer_size(512*1024);
            _ = stream.set_recv_buffer_size(512*1024);
            return Ok(BufferedTcpStream(BufStream::with_capacity(1024*1024, 1024*1024, stream)));
        })
    }
}

The BufferedTcpStream was tokio::io::TcpStream, I replaced it with my own BufStream<TcpStream> as below

use tokio::prelude::*;
use tokio::net::TcpStream;
use tokio::io::BufStream;
pub struct BufferedTcpStream(BufStream<TcpStream>);


impl AsyncRead for BufferedTcpStream {
    unsafe fn prepare_uninitialized_buffer(&self, _: &mut [std::mem::MaybeUninit<u8>]) -> bool {
        false
    }

    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut core::task::Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(self.0.get_mut()).poll_read(cx, buf)
    }
}

impl AsyncWrite for BufferedTcpStream {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut core::task::Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(self.0.get_mut()).poll_write(cx, buf)
    }

    fn poll_write_buf<B: Buf>(
        mut self: Pin<&mut Self>,
        cx: &mut core::task::Context<'_>,
        buf: &mut B,
    ) -> Poll<io::Result<usize>> {
        Pin::new(self.0.get_mut()).poll_write_buf(cx, buf)
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(self.0.get_mut()).poll_flush(cx)
    }

    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(self.0.get_mut()).poll_shutdown(cx)
    }
}


impl hyper::client::connect::Connection for BufferedTcpStream {
    fn connected(&self) -> Connected {
        self.0.get_ref().connected()
    }
}

Questions :

  • Any glaring issue?
  • Should I set the hyper::client::executor to use tokio's executor? I already have #[tokio::main(threaded_scheduler)] in main
  • Is it nessecary to wrap TcpStream with BufStream while I already configured a big send/recv buffer on socket? Or hyper::Client internally implemented buffer already?
  • The buffer size of BufStream should be bigger than the buffer of TcpStream?
  • In the bottom are the calling logs of those methods fired by hyper::Client. It seems poll_flush is called frequently. I checked the source code and seems poll_flush is to load the reader buffer rather than flushing the writer buffer?


01-15 10:52:41  [src\upstream.rs:99] connected
01-15 10:52:41  [src\upstream.rs:61] poll_read
01-15 10:52:41  [src\upstream.rs:86] poll_flush
01-15 10:52:41  [src\upstream.rs:61] poll_read
01-15 10:52:41  [src\upstream.rs:81] poll_write_buf
01-15 10:52:41  [src\upstream.rs:86] poll_flush
01-15 10:52:42  [src\upstream.rs:61] poll_read
01-15 10:52:42  [src\upstream.rs:61] poll_read
01-15 10:52:42  [src\upstream.rs:61] poll_read
01-15 10:52:42  [src\upstream.rs:86] poll_flush
01-15 10:52:42  [src\upstream.rs:61] poll_read
01-15 10:52:42  [src\upstream.rs:86] poll_flush
01-15 10:52:57  [src\upstream.rs:61] poll_read
01-15 10:52:57  [src\upstream.rs:81] poll_write_buf
01-15 10:52:57  [src\upstream.rs:86] poll_flush
01-15 10:52:57  [src\upstream.rs:61] poll_read
01-15 10:52:57  [src\upstream.rs:61] poll_read
01-15 10:52:57  [src\upstream.rs:61] poll_read
01-15 10:52:57  [src\upstream.rs:86] poll_flush
01-15 10:52:57  [src\upstream.rs:61] poll_read
01-15 10:52:57  [src\upstream.rs:86] poll_flush
01-15 10:53:07  [src\upstream.rs:99] connected
01-15 10:53:07  [src\upstream.rs:61] poll_read
01-15 10:53:07  [src\upstream.rs:86] poll_flush
01-15 10:53:07  [src\upstream.rs:61] poll_read
01-15 10:53:07  [src\upstream.rs:81] poll_write_buf
01-15 10:53:07  [src\upstream.rs:86] poll_flush
01-15 10:53:08  [src\upstream.rs:61] poll_read
01-15 10:53:08  [src\upstream.rs:86] poll_flush
01-15 10:53:08  [src\upstream.rs:61] poll_read
01-15 10:53:08  [src\upstream.rs:86] poll_flush
01-15 10:53:08  [src\upstream.rs:61] poll_read
01-15 10:53:08  [src\upstream.rs:86] poll_flush
01-15 10:53:08  [src\upstream.rs:61] poll_read
01-15 10:53:08  [src\upstream.rs:61] poll_read
01-15 10:53:08  [src\upstream.rs:61] poll_read
01-15 10:53:08  [src\upstream.rs:86] poll_flush
01-15 10:53:08  [src\upstream.rs:61] poll_read
01-15 10:53:08  [src\upstream.rs:86] poll_flush
01-15 10:53:08  [src\upstream.rs:61] poll_read
01-15 10:53:08  [src\upstream.rs:86] poll_flush
01-15 10:53:11  [src\upstream.rs:61] poll_read
01-15 10:53:11  [src\upstream.rs:81] poll_write_buf
01-15 10:53:11  [src\upstream.rs:86] poll_flush
01-15 10:53:11  [src\upstream.rs:61] poll_read
01-15 10:53:11  [src\upstream.rs:86] poll_flush
01-15 10:53:11  [src\upstream.rs:61] poll_read
01-15 10:53:11  [src\upstream.rs:86] poll_flush
01-15 10:53:11  [src\upstream.rs:61] poll_read
01-15 10:53:11  [src\upstream.rs:61] poll_read
01-15 10:53:11  [src\upstream.rs:61] poll_read
01-15 10:53:11  [src\upstream.rs:86] poll_flush
01-15 10:53:11  [src\upstream.rs:61] poll_read
01-15 10:53:11  [src\upstream.rs:86] poll_flush
01-15 10:53:11  [src\upstream.rs:61] poll_read
01-15 10:53:11  [src\upstream.rs:86] poll_flush
01-15 10:53:42  [src\upstream.rs:61] poll_read
01-15 10:53:42  [src\upstream.rs:86] poll_flush
01-15 10:53:42  [src\upstream.rs:91] poll_shutdown
01-15 10:54:08  [src\upstream.rs:61] poll_read
01-15 10:54:08  [src\upstream.rs:86] poll_flush
01-15 10:54:08  [src\upstream.rs:91] poll_shutdown
01-15 10:55:17  [src\upstream.rs:99] connected
01-15 10:55:17  [src\upstream.rs:61] poll_read
01-15 10:55:17  [src\upstream.rs:86] poll_flush
01-15 10:55:17  [src\upstream.rs:61] poll_read
01-15 10:55:17  [src\upstream.rs:81] poll_write_buf
01-15 10:55:17  [src\upstream.rs:86] poll_flush
01-15 10:55:17  [src\upstream.rs:61] poll_read
01-15 10:55:17  [src\upstream.rs:61] poll_read
01-15 10:55:17  [src\upstream.rs:61] poll_read
01-15 10:55:17  [src\upstream.rs:86] poll_flush
01-15 10:55:17  [src\upstream.rs:61] poll_read
01-15 10:55:17  [src\upstream.rs:86] poll_flush
01-15 10:55:17  [src\upstream.rs:61] poll_read
01-15 10:55:17  [src\upstream.rs:81] poll_write_buf
01-15 10:55:17  [src\upstream.rs:86] poll_flush
01-15 10:55:18  [src\upstream.rs:61] poll_read
01-15 10:55:18  [src\upstream.rs:61] poll_read
01-15 10:55:18  [src\upstream.rs:61] poll_read
01-15 10:55:18  [src\upstream.rs:86] poll_flush
01-15 10:55:18  [src\upstream.rs:61] poll_read
01-15 10:55:18  [src\upstream.rs:86] poll_flush
01-15 10:55:18  [src\upstream.rs:61] poll_read
01-15 10:55:18  [src\upstream.rs:86] poll_flush
01-15 10:55:20  [src\upstream.rs:61] poll_read
01-15 10:55:20  [src\upstream.rs:81] poll_write_buf
01-15 10:55:20  [src\upstream.rs:86] poll_flush
01-15 10:55:20  [src\upstream.rs:61] poll_read
01-15 10:55:20  [src\upstream.rs:61] poll_read
01-15 10:55:20  [src\upstream.rs:61] poll_read
01-15 10:55:20  [src\upstream.rs:86] poll_flush
01-15 10:55:20  [src\upstream.rs:61] poll_read
01-15 10:55:20  [src\upstream.rs:86] poll_flush
01-15 10:55:20  [src\upstream.rs:61] poll_read
01-15 10:55:20  [src\upstream.rs:81] poll_write_buf
01-15 10:55:20  [src\upstream.rs:86] poll_flush
01-15 10:55:21  [src\upstream.rs:61] poll_read
01-15 10:55:21  [src\upstream.rs:61] poll_read
01-15 10:55:21  [src\upstream.rs:61] poll_read
01-15 10:55:21  [src\upstream.rs:86] poll_flush
01-15 10:55:21  [src\upstream.rs:61] poll_read
01-15 10:55:21  [src\upstream.rs:86] poll_flush
01-15 10:55:21  [src\upstream.rs:61] poll_read
01-15 10:55:21  [src\upstream.rs:86] poll_flush
01-15 10:55:22  [src\upstream.rs:61] poll_read
01-15 10:55:22  [src\upstream.rs:81] poll_write_buf
01-15 10:55:22  [src\upstream.rs:86] poll_flush
01-15 10:55:22  [src\upstream.rs:61] poll_read
01-15 10:55:22  [src\upstream.rs:61] poll_read
01-15 10:55:22  [src\upstream.rs:61] poll_read
01-15 10:55:22  [src\upstream.rs:86] poll_flush
01-15 10:55:22  [src\upstream.rs:61] poll_read
01-15 10:55:22  [src\upstream.rs:86] poll_flush
01-15 10:55:22  [src\upstream.rs:61] poll_read
01-15 10:55:22  [src\upstream.rs:81] poll_write_buf
01-15 10:55:22  [src\upstream.rs:86] poll_flush
01-15 10:55:22  [src\upstream.rs:61] poll_read
01-15 10:55:22  [src\upstream.rs:61] poll_read
01-15 10:55:22  [src\upstream.rs:61] poll_read
01-15 10:55:22  [src\upstream.rs:86] poll_flush
01-15 10:55:22  [src\upstream.rs:61] poll_read
01-15 10:55:22  [src\upstream.rs:86] poll_flush
01-15 10:55:22  [src\upstream.rs:61] poll_read
01-15 10:55:22  [src\upstream.rs:86] poll_flush
01-15 10:55:24  [src\upstream.rs:61] poll_read
01-15 10:55:24  [src\upstream.rs:81] poll_write_buf
01-15 10:55:24  [src\upstream.rs:86] poll_flush
01-15 10:55:24  [src\upstream.rs:61] poll_read
01-15 10:55:24  [src\upstream.rs:61] poll_read
01-15 10:55:24  [src\upstream.rs:61] poll_read
01-15 10:55:24  [src\upstream.rs:86] poll_flush
01-15 10:55:24  [src\upstream.rs:61] poll_read
01-15 10:55:24  [src\upstream.rs:86] poll_flush
01-15 10:55:24  [src\upstream.rs:61] poll_read
01-15 10:55:24  [src\upstream.rs:81] poll_write_buf
01-15 10:55:24  [src\upstream.rs:86] poll_flush
01-15 10:55:44  [src\upstream.rs:61] poll_read
01-15 10:55:44  [src\upstream.rs:86] poll_flush
01-15 10:55:44  [src\upstream.rs:61] poll_read
01-15 10:55:44  [src\upstream.rs:86] poll_flush
01-15 10:55:44  [src\upstream.rs:61] poll_read
01-15 10:55:44  [src\upstream.rs:86] poll_flush
01-15 10:55:44  [src\upstream.rs:61] poll_read
01-15 10:55:44  [src\upstream.rs:86] poll_flush
01-15 10:55:44  [src\upstream.rs:61] poll_read
01-15 10:55:44  [src\upstream.rs:61] poll_read
01-15 10:55:44  [src\upstream.rs:61] poll_read
01-15 10:55:44  [src\upstream.rs:86] poll_flush
01-15 10:55:44  [src\upstream.rs:61] poll_read
01-15 10:55:44  [src\upstream.rs:86] poll_flush
01-15 10:55:44  [src\upstream.rs:61] poll_read
01-15 10:55:44  [src\upstream.rs:86] poll_flush

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.