How to use a custom transporter for this http client?

Hyper's http client can be passed a custom hyper::client::connect::Connect - Rust here:

pub fn build<C, B>(&self, connector: C) -> Client<C, B> where C: Connect + Clone, B: HttpBody + Send, B::Data: Send,

If we look at

impl<S, T> Connect for S where    

S: Service<Uri, Response = T> + Send + 'static,    

S::Error: Into<Box<dyn StdError + Send + Sync>>,    

S::Future: Unpin + Send,    

T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, 

I think Service is something that can be called and polled to get the request: hyper::service::Service - Rust

but what about T?

I'm confused, the T is the B in pub fn build<C, B>(&self, connector: C) -> Client<C, B>, right? But where is this B passed?

So how should I implement Connect?

I did

use hyper::http::{Request, Response, StatusCode};
use hyper::service::Service;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;

#[derive(Clone)]
pub struct CustomTransporter;

impl CustomTransporter {
    pub fn new() -> CustomTransporter {
        CustomTransporter{}
    }
}

impl Service<hyper::Uri> for CustomTransporter {
    type Response = Response<Vec<u8>>;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: hyper::Uri) -> Self::Future {
        // create the body
        let body: Vec<u8> = "hello, world!\n"
            .as_bytes()
            .to_owned();
        // Create the HTTP response
        let resp = Response::builder()
            .status(StatusCode::OK)
            .body(body)
            .expect("Unable to create `http::Response`");
         
        // create a response in a future.
        let fut = async {
            Ok(resp)
        };

        // Return the response as an immediate future
        Box::pin(fut)
    }
}

but then on

let connector = CustomTransporter::new();
let client = Client::builder().build(connector);

on the build call, I get

the trait bound `hyper::Response<std::vec::Vec<u8>>: tokio::io::AsyncRead` is not satisfied

the trait `tokio::io::AsyncRead` is not implemented for `hyper::Response<std::vec::Vec<u8>>`

note: required because of the requirements on the impl of `hyper::client::connect::Connect` for `custom_req::CustomTransporter`rustc(E0277)

but I think I should implement instead Service<Request<Vec<u8>>>

The type of CustomTransporter::Response needs to be something that implements the AsyncRead trait.

If you just want to return a single vector, you can wrap the vector in an std::io::Cursor, which implements the required trait.

Note that when posting error messages, please run cargo build to get the full error message rather than the IDE popups.

Here's my custom transporter with a Response of type std::io::Cursor

use hyper::http::{Request, Response, StatusCode};
use hyper::service::Service;
use hyper::client::connect::Connect;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;
use std::io::Cursor;

#[derive(Clone)]
pub struct CustomTransporter;

impl CustomTransporter {
    pub fn new() -> CustomTransporter {
        CustomTransporter{}
    }
}

impl Service<hyper::Uri> for CustomTransporter {
    type Response = Cursor<std::vec::Vec<u8>>;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: hyper::Uri) -> Self::Future {
        // create the body
        let body: Vec<u8> = "hello, world!\n"
            .as_bytes()
            .to_owned();
        // Create the HTTP response
        let resp = Cursor::new(body.to_vec());
         
        // create a response in a future.
        let fut = async {
            Ok(resp)
        };

        // Return the response as an immediate future
        Box::pin(fut)
    }
}

Then I call like this:

let connector = CustomTransporter::new();
let client = Client::builder().build(connector);
let mut res = client.get(url).await?;

and I get

error[E0277]: the trait bound `std::io::Cursor<Vec<u8>>: hyper::client::connect::Connection` is not satisfied
  --> src/main.rs:42:36
   |
42 |     let client = Client::builder().build(connector);
   |                                    ^^^^^ the trait `hyper::client::connect::Connection` is not implemented for `std::io::Cursor<Vec<u8>>`
   |
   = note: required because of the requirements on the impl of `hyper::client::connect::Connect` for `CustomTransporter`

By looking at the Connect trait, hyper::client::connect::Connect - Rust, I'm confused as to what is needed.

I tried doing simply

impl Connect for CustomTransporter {
    
}

but then I get a conflict with the other impl

I implemented Connection for CustomTransporter, like this:

impl Connection for CustomTransporter {
    fn connected(&self) -> Connected {
        Connected::new()
    }
}

but then I get:

error[E0277]: the trait bound `std::io::Cursor<Vec<u8>>: hyper::client::connect::Connection` is not satisfied
  --> src/main.rs:42:36
   |
42 |     let client = Client::builder().build(connector);
   |                                    ^^^^^ the trait `hyper::client::connect::Connection` is not implemented for `std::io::Cursor<Vec<u8>>`

and I cannot implement Connection for Cursor<Vec<u8>>

You can write your own wrapper struct around a Cursor<Vec<u8>>, the implement Connection for your custom struct.

Nice. I just did that, take a look please:

use hyper::http::{Request, Response, StatusCode};
use hyper::service::Service;
use hyper::client::connect::Connect;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;
use std::io::Cursor;
use hyper::client::connect::{Connection, Connected};
use tokio::io::{AsyncRead, AsyncWrite};

#[derive(Clone)]
pub struct CustomTransporter;

unsafe impl Send for CustomTransporter {}

impl CustomTransporter {
    pub fn new() -> CustomTransporter {
        CustomTransporter{}
    }
}

impl Connection for CustomTransporter {
    fn connected(&self) -> Connected {
        Connected::new()
    }
}

struct CustomResponse {
    w: Cursor<Vec<u8>>
}

impl Connection for CustomResponse {
    fn connected(&self) -> Connected {
        Connected::new()
    }
}

impl AsyncRead for CustomResponse {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>
    ) -> Poll<std::io::Result<()>> {
        self.w.poll_read(cx, buf)
    }
}

impl AsyncWrite for CustomResponse {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8]
    ) -> Poll<Result<usize, std::io::Error>>{
        let v = vec!();
        self.w.poll_write(cf, buf);
    }
    fn poll_flush(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>> {
        self.w.poll_flush(cx)
    }

    fn poll_shutdown(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>>
    {
        self.w.poll_shutdown(cx)
    }
}


impl Service<hyper::Uri> for CustomTransporter {
    type Response = CustomResponse;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: hyper::Uri) -> Self::Future {
        // create the body
        let body: Vec<u8> = "hello, world!\n"
            .as_bytes()
            .to_owned();
        // Create the HTTP response
        let resp = CustomResponse{
            w: Cursor::new(body.to_vec())
        };
         
        // create a response in a future.
        let fut = async {
            Ok(resp)
        };

        // Return the response as an immediate future
        Box::pin(fut)
    }
}

it asked me for AsyncWrite also. I don't understand why the Response must implement AsyncWrite since it's supposed to be read, not written to. Do you have an idea?

Anyways, I get problems on forwarding all these calls to std::Cursor:

error[E0599]: no method named `poll_flush` found for struct `std::io::Cursor<Vec<u8>>` in the current scope
  --> src/custom_req.rs:61:16
   |
61 |         self.w.poll_flush(cx)
   |                ^^^^^^^^^^ method not found in `std::io::Cursor<Vec<u8>>`

error[E0599]: no method named `poll_shutdown` found for struct `std::io::Cursor<Vec<u8>>` in the current scope
  --> src/custom_req.rs:69:16
   |
69 |         self.w.poll_shutdown(cx)
   |                ^^^^^^^^^^^^^ method not found in `std::io::Cursor<Vec<u8>>`

Why I cannot call these functions, if std::Cursor does implement them?

To call them, you need to do Pin::new(&mut self.w).poll_flush(cx)

use hyper::http::{Request, Response, StatusCode};
use hyper::service::Service;
use hyper::client::connect::Connect;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;
use std::io::Cursor;
use hyper::client::connect::{Connection, Connected};
use tokio::io::{AsyncRead, AsyncWrite};

#[derive(Clone)]
pub struct CustomTransporter;

unsafe impl Send for CustomTransporter {}

impl CustomTransporter {
    pub fn new() -> CustomTransporter {
        CustomTransporter{}
    }
}

impl Connection for CustomTransporter {
    fn connected(&self) -> Connected {
        Connected::new()
    }
}

struct CustomResponse {
    w: Cursor<Vec<u8>>
}

unsafe impl Send for CustomResponse {
    
}

impl Connection for CustomResponse {
    fn connected(&self) -> Connected {
        Connected::new()
    }
}

impl AsyncRead for CustomResponse {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>
    ) -> Poll<std::io::Result<()>> {
        Pin::new(&mut self.w).poll_read(cx, buf)
    }
}

impl AsyncWrite for CustomResponse {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8]
    ) -> Poll<Result<usize, std::io::Error>>{
        let v = vec!();
        Pin::new(&mut self.w).poll_write(cx, buf)
    }
    fn poll_flush(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>> {
        Pin::new(&mut self.w).poll_flush(cx)
    }

    fn poll_shutdown(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>>
    {
        Pin::new(&mut self.w).poll_shutdown(cx)
    }
}


impl Service<hyper::Uri> for CustomTransporter {
    type Response = CustomResponse;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: hyper::Uri) -> Self::Future {
        // create the body
        let body: Vec<u8> = "hello, world!\n"
            .as_bytes()
            .to_owned();
        // Create the HTTP response
        let resp = CustomResponse{
            w: Cursor::new(body.to_vec())
        };
         
        // create a response in a future.
        let fut = async {
            Ok(resp)
        };

        // Return the response as an immediate future
        Box::pin(fut)
    }
}

I get

error[E0277]: `(dyn Future<Output = std::result::Result<CustomResponse, hyper::http::Error>> + 'static)` cannot be sent between threads safely
  --> src/main.rs:42:36
   |
42 |     let client = Client::builder().build(connector);
   |                                    ^^^^^ `(dyn Future<Output = std::result::Result<CustomResponse, hyper::http::Error>> + 'static)` cannot be sent between threads safely
   |

I cannot simply implement Send to Future, and I cannot change Future by a wrapper.

Here's the project if anyone has interest in building to try GitHub - lzunsec/rust_hyper_custom_transporter at 33e67c5db2db1462580a072893345027b634fe5a

Use an dyn Future<...> + Send instead, or the BoxFuture alias in the futures crate.

thanks, after doing that and some other corrections, I got it to compile:

use hyper::service::Service;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;
use std::io::Cursor;
use hyper::client::connect::{Connection, Connected};
use tokio::io::{AsyncRead, AsyncWrite};

#[derive(Clone)]
pub struct CustomTransporter;

unsafe impl Send for CustomTransporter {}

impl CustomTransporter {
    pub fn new() -> CustomTransporter {
        CustomTransporter{}
    }
}

impl Connection for CustomTransporter {
    fn connected(&self) -> Connected {
        Connected::new()
    }
}

pub struct CustomResponse {
    w: Cursor<Vec<u8>>
}

unsafe impl Send for CustomResponse {
    
}

impl Connection for CustomResponse {
    fn connected(&self) -> Connected {
        println!("connected");
        Connected::new()
    }
}

impl AsyncRead for CustomResponse {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>
    ) -> Poll<std::io::Result<()>> {
        println!("poll_read");
        Pin::new(&mut self.w).poll_read(cx, buf)
    }
}

impl AsyncWrite for CustomResponse {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8]
    ) -> Poll<Result<usize, std::io::Error>>{
        //let v = vec!();
        Pin::new(&mut self.w).poll_write(cx, buf)
    }
    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>> {
        Pin::new(&mut self.w).poll_flush(cx)
    }

    fn poll_shutdown(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>>
    {
        Pin::new(&mut self.w).poll_shutdown(cx)
    }
}


impl Service<hyper::Uri> for CustomTransporter {
    type Response = CustomResponse;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        println!("poll_ready");
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: hyper::Uri) -> Self::Future {
        println!("call");
        // create the body
        let body: Vec<u8> = "hello, world!\n"
            .as_bytes()
            .to_owned();
        // Create the HTTP response
        let resp = CustomResponse{
            w: Cursor::new(body)
        };
         
        // create a response in a future.
        let fut = async {
            Ok(resp)
        };
        println!("gonna return from call");
        // Return the response as an immediate future
        Box::pin(fut)
    }
}

If someone knows how to make the fake request work, it'd be awesome. I don't see anything I've done wrong