Using a custom transporter for hyper http

I'm trying to make a custom transporter for the hyper http crate so I can transport http packets in my own way.

Hyper's http client can be passed a custom hyper::client::connect::Connect - Rust here:

pub fn build<C, B>(&self, connector: C) -> Client<C, B> where C: Connect + Clone, B: HttpBody + Send, B::Data: Send,

If we look at

impl<S, T> Connect for S where    

S: Service<Uri, Response = T> + Send + 'static,    

S::Error: Into<Box<dyn StdError + Send + Sync>>,    

S::Future: Unpin + Send,    

T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, 

the type T, which is the type of the Response, must implement AsyncRead + AsyncWrite, so I've chosen type Response = Cursor<Vec<u8>>.

Here's my CustomTransporter:

use hyper::service::Service;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;
use std::io::Cursor;
use hyper::client::connect::{Connection, Connected};
use tokio::io::{AsyncRead, AsyncWrite};

#[derive(Clone)]
pub struct CustomTransporter;

unsafe impl Send for CustomTransporter {}

impl CustomTransporter {
    pub fn new() -> CustomTransporter {
        CustomTransporter{}
    }
}

impl Connection for CustomTransporter {
    fn connected(&self) -> Connected {
        Connected::new()
    }
}

pub struct CustomResponse {
    w: Cursor<Vec<u8>>
}

unsafe impl Send for CustomResponse {
    
}

impl Connection for CustomResponse {
    fn connected(&self) -> Connected {
        println!("connected");
        Connected::new()
    }
}

impl AsyncRead for CustomResponse {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>
    ) -> Poll<std::io::Result<()>> {
        println!("poll_read");
        let r = Pin::new(&mut self.w).poll_read(cx, buf);
        println!("did poll_read");
        r
    }
}

impl AsyncWrite for CustomResponse {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8]
    ) -> Poll<Result<usize, std::io::Error>>{
        //let v = vec!();
        println!("poll_write");
        Pin::new(&mut self.w).poll_write(cx, buf)
    }
    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>> {
        println!("poll_flush");
        Pin::new(&mut self.w).poll_flush(cx)
    }

    fn poll_shutdown(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>>
    {
        println!("poll_shutdown");
        Pin::new(&mut self.w).poll_shutdown(cx)
    }
}


impl Service<hyper::Uri> for CustomTransporter {
    type Response = CustomResponse;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        println!("poll_ready");
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: hyper::Uri) -> Self::Future {
        println!("call");
        // create the body
        let body: Vec<u8> = "HTTP/1.1 200 OK
Date: Sat, 06 Feb 2021 03:35:23 GMT
Server: Apache/2.4.34 (Amazon) OpenSSL/1.0.2k-fips PHP/5.5.38
Last-Modified: Wed, 23 Dec 2015 01:18:20 GMT
ETag: \"353-527867f65e8ad\"
Accept-Ranges: bytes
Content-Length: 851
Keep-Alive: timeout=5, max=100
Connection: Keep-Alive
Content-Type: text/html; charset=UTF-8\n
hello".as_bytes()
            .to_owned();
        // Create the HTTP response
        let resp = CustomResponse{
            w: Cursor::new(body)
        };
         
        // create a response in a future.
        let fut = async {
            Ok(resp)
        };
        println!("gonna return from call");
        // Return the response as an immediate future
        Box::pin(fut)
    }
}

I call it like this:

let connector = CustomTransporter::new();
let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);
let mut res = client.get(url).await.unwrap();

This is the error I get:

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: hyper::Error(ChannelClosed)', src/main.rs:49:41

The error is in the line

let mut res = client.get(url).await.unwrap();

I found the error coming exactly from [here][1] on the hyper library:

impl<T, U> Sender<T, U> {
    pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
        self.giver
            .poll_want(cx)
            .map_err(|_| crate::Error::new_closed())
    }

but I don't know why.

https://github.com/hyperium/hyper/blob/4c946af49cc7fbbc6bd4894283a654625c2ea383/src/client/client.rs#L389

This is where the error comes from.

Adding some verbosity, I I got this specific error:

received unexpected 325 bytes on an idle connection

right from this line:

https://github.com/hyperium/hyper/blob/4c946af49cc7fbbc6bd4894283a654625c2ea383/src/proto/h1/conn.rs#L326

Looks like something in my code makes hyper call require_empty_read.

Here's a complete project for testing: https://github.com/lzunsec/rust_hyper_custom_transporter/tree/c0fb75d9ed8fdf854888ccd605063ca433383689

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.