Axum/Hyper with Monoio

Question

I am trying to use Axum with Monoio (iouring) runtime. There is already a hyper server implementation for this, I would like to extend this to use axum instead of hyper.

// [dependencies]
// axum = "0.5.13"
// futures = "0.3.21"
// hyper = { version = "0.14.20", features = ["full"] }
// monoio = { git = "https://github.com/bytedance/monoio", branch = "master" }
// monoio-compat = { git = "https://github.com/bytedance/monoio", branch = "master" }
// tower = "0.4.13"

use axum::body::HttpBody;
use axum::extract::connect_info::IntoMakeServiceWithConnectInfo;
use axum::extract::ConnectInfo;
use axum::routing::*;
use axum::Router;
use futures::Future;
use hyper::service::Service;
use hyper::{server::conn::Http, service::service_fn};
use monoio::net::TcpListener;
use monoio_compat::TcpStreamCompat;
use std::net::SocketAddr;
use tower::MakeService;

#[derive(Clone)]
struct HyperExecutor;

impl<F> hyper::rt::Executor<F> for HyperExecutor
where
    F: Future + 'static,
    F::Output: 'static,
{
    fn execute(&self, fut: F) {
        monoio::spawn(fut);
    }
}

// using service function (just hyper)
// this works
pub(crate) async fn serve_http<S, F, R, A>(addr: A, service: S) -> std::io::Result<()>
where
    S: FnMut(Request<Body>) -> F + 'static + Copy,
    //S: Service<Request<Body>> ,
    F: Future<Output = Result<Response<Body>, R>> + 'static,
    R: std::error::Error + 'static + Send + Sync,
    A: Into<SocketAddr>,
{
    let listener = TcpListener::bind(addr.into())?;
    loop {
        let (stream, _) = listener.accept().await?;
        monoio::spawn(
            Http::new()
                .with_executor(HyperExecutor)
                .serve_connection(unsafe { TcpStreamCompat::new(stream) }, service_fn(service)), // <- making a service fn
        );
    }
}

use hyper::{Body, Method, Request, Response, StatusCode};

// using service function (just hyper)
async fn hyper_handler(req: Request<Body>) -> Result<Response<Body>, std::convert::Infallible> {
    match (req.method(), req.uri().path()) {
        (&Method::GET, "/") => Ok(Response::new(Body::from("Hello World!"))),
        (&Method::GET, "/monoio") => Ok(Response::new(Body::from("Hello Monoio!"))),
        _ => Ok(Response::builder()
            .status(StatusCode::NOT_FOUND)
            .body(Body::from("404 not found"))
            .unwrap()),
    }
}

async fn root() -> &'static str {
    "hi there"
}

#[monoio::main(threads = 4)]
async fn main() {
    let mut app = Router::new()
        .route("/", get(root))
        .into_make_service_with_connect_info::<SocketAddr>();

    println!("Running http server on 0.0.0.0:8000");
    let addr = ([0, 0, 0, 0], 8000);

    let listener = TcpListener::bind(addr.into()).unwrap();


    loop {
        let (stream, client_socket_addr) = listener.accept().await.unwrap();
        let stream = unsafe { TcpStreamCompat::new(stream) };
        let app = app.make_service(&stream).await.unwrap(); // error here because stream is not AddrStream
        monoio::spawn(
            Http::new()
                .with_executor(HyperExecutor)
                .serve_connection(stream, app),
        );
    }
    // type mismatch resolving `<IntoMakeServiceWithConnectInfo<Router<_>, std::net::SocketAddr> as Service<Request<Body>>>::Response == Response<_>`
    // expected struct `AddExtension<Router<_>, ConnectInfo<std::net::SocketAddr>>`
    // found struct `Response<_>`
    // required because of the requirements on the impl of `hyper::service::http::HttpService<Body>` for `IntoMakeServiceWithConnectInfo<Router<_>, std::net::SocketAddr>`
    // required because of the requirements on the impl of `std::future::Future` for `hyper::server::conn::Connection<TcpStreamCompat, IntoMakeServiceWithConnectInfo<Router<_>, std::net::SocketAddr>, HyperExecutor>`

    // this one works
    //let _ = serve_http(([0, 0, 0, 0], 8000), hyper_handler).await;
}

Using hyper_handler with service_fn works. But, I tried using axum's routers and method handlers, it was not successful.

  1. app.make_service doesn't work with just TcpStream (basically anything that impls AsyncRW + Unpin, but only works with AddrStream and unfortunately it has no public constructor, and in hyper 1.0 this will be removed entirely
  2. Tried with make_service_fn from hyper, still requires AddrStream
  3. Tried using executor method on axum::Server panics. (as it should)
Running http server on 0.0.0.0:8000
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.20.0/src/runtime/context.rs:30:26
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Ideal API should be

let app = Router::new().route("/", get(root)).into_make_service();
serve_http(..., app).await;

Do you have any ideas how to implement this? Or have some ability to have custom executor perhaps withaxum::Server.

An example that works with hyper::Server is here

Requires nightly and io-uring linux

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.