I'm trying to make a custom transporter for the hyper http crate so I can transport http packets in my own way.
Hyper's http client can be passed a custom hyper::client::connect::Connect - Rust here:
pub fn build<C, B>(&self, connector: C) -> Client<C, B> where C: Connect + Clone, B: HttpBody + Send, B::Data: Send,
If we look at
impl<S, T> Connect for S where
S: Service<Uri, Response = T> + Send + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
the type T
, which is the type of the Response
, must implement AsyncRead + AsyncWrite
, so I've chosen type Response = Cursor<Vec<u8>>
.
Here's my CustomTransporter
:
use hyper::service::Service;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;
use std::io::Cursor;
use hyper::client::connect::{Connection, Connected};
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Clone)]
pub struct CustomTransporter;
unsafe impl Send for CustomTransporter {}
impl CustomTransporter {
pub fn new() -> CustomTransporter {
CustomTransporter{}
}
}
impl Connection for CustomTransporter {
fn connected(&self) -> Connected {
Connected::new()
}
}
pub struct CustomResponse {
w: Cursor<Vec<u8>>
}
unsafe impl Send for CustomResponse {
}
impl Connection for CustomResponse {
fn connected(&self) -> Connected {
println!("connected");
Connected::new()
}
}
impl AsyncRead for CustomResponse {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>
) -> Poll<std::io::Result<()>> {
println!("poll_read");
let r = Pin::new(&mut self.w).poll_read(cx, buf);
println!("did poll_read");
r
}
}
impl AsyncWrite for CustomResponse {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8]
) -> Poll<Result<usize, std::io::Error>>{
//let v = vec!();
println!("poll_write");
Pin::new(&mut self.w).poll_write(cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), std::io::Error>> {
println!("poll_flush");
Pin::new(&mut self.w).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), std::io::Error>>
{
println!("poll_shutdown");
Pin::new(&mut self.w).poll_shutdown(cx)
}
}
impl Service<hyper::Uri> for CustomTransporter {
type Response = CustomResponse;
type Error = hyper::http::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
println!("poll_ready");
Poll::Ready(Ok(()))
}
fn call(&mut self, req: hyper::Uri) -> Self::Future {
println!("call");
// create the body
let body: Vec<u8> = "HTTP/1.1 200 OK
Date: Sat, 06 Feb 2021 03:35:23 GMT
Server: Apache/2.4.34 (Amazon) OpenSSL/1.0.2k-fips PHP/5.5.38
Last-Modified: Wed, 23 Dec 2015 01:18:20 GMT
ETag: \"353-527867f65e8ad\"
Accept-Ranges: bytes
Content-Length: 851
Keep-Alive: timeout=5, max=100
Connection: Keep-Alive
Content-Type: text/html; charset=UTF-8\n
hello".as_bytes()
.to_owned();
// Create the HTTP response
let resp = CustomResponse{
w: Cursor::new(body)
};
// create a response in a future.
let fut = async {
Ok(resp)
};
println!("gonna return from call");
// Return the response as an immediate future
Box::pin(fut)
}
}
I call it like this:
let connector = CustomTransporter::new();
let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);
let mut res = client.get(url).await.unwrap();
This is the error I get:
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: hyper::Error(ChannelClosed)', src/main.rs:49:41
The error is in the line
let mut res = client.get(url).await.unwrap();
I found the error coming exactly from [here][1] on the hyper library:
impl<T, U> Sender<T, U> {
pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
self.giver
.poll_want(cx)
.map_err(|_| crate::Error::new_closed())
}
but I don't know why.
This is where the error comes from.
Adding some verbosity, I I got this specific error:
received unexpected 325 bytes on an idle connection
right from this line:
Looks like something in my code makes hyper call require_empty_read
.
Here's a complete project for testing: https://github.com/lzunsec/rust_hyper_custom_transporter/tree/c0fb75d9ed8fdf854888ccd605063ca433383689