Help Designing a Client-Service Controller with TcpStream Routing

"I'm working on a Controller to facilitate client connections and route their requests to the relevant services. My aim is to maintain a HashMap or any other similar structure for storing TcpStreams for each connected client. This approach enables microservices to communicate with clients by sending messages to the Controller. Using the client's ID as a key, the Controller selects the corresponding TcpStream from the HashMap. However, I face challenges reusing the stream once it's moved into the TokioIO struct during proxying."

use std::convert::Infallible;
use std::future::Future;

use bytes::Bytes;
use central_proxy::gateway::GatewayService;
use central_proxy::gateway::{GatewayServer, Message};
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::rt::Executor;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
use tonic::body::BoxBody;
use tonic::transport::Server;
use tonic::Status;

use hyper::server::conn::http2;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "127.0.0.1:50051";

    let Ok(listerner) = tokio::net::TcpListener::bind(addr).await else {
        tracing::error!("failed to bind to address {addr}");
        std::process::exit(34);
    };

    let active_connections = Arc::new(Mutex::new(std::collections::HashMap::<
        String,
        Arc<Mutex<TokioIo<TcpStream>>>,
    >::new()));

    let active_connections_clone = active_connections.clone();
    tokio::task::spawn(async move {
        while let Ok((stream, _)) = listerner.accept().await {
            tracing::info!("connection received, processing");

            let io = TokioIo::new(stream);
            let io_arc = Arc::new(Mutex::new(io));

            let mut guard = active_connections_clone.lock().unwrap();
            guard.insert("IOT_101".into(), io_arc.clone()); // just Test, will extract ID from request

            // let executor = TokioExecutor::default();
            let executor = TokioExecutor;

            // this does not work since hyper::rt::Read + Write is not implemented
            http2::Builder::new(executor)
                .serve_connection(io_arc.lock().unwrap(), service_fn(proxy))
                .await;
        }
    });

    Ok(())
}

#[derive(Clone, Default)]
struct TokioExecutor;

impl<Fut> Executor<Fut> for TokioExecutor
where
    Fut: Future + Send + 'static,
    Fut::Output: Send + 'static,
{
    fn execute(&self, future: Fut) {
        tokio::task::spawn(future);
    }
}

async fn proxy(request: Request<Incoming>) -> Result<Response<Incoming>, hyper::Error> {
    dbg!(&request);
    let uri = request.uri().clone();
    let (service_name, method_name) = parse_grpc_path(uri.path());

    let body = request.into_body();
    dbg!(&body);

    match service_name.unwrap() {
        "test_service" => {
            let executor = TokioExecutor;
            let stream = tokio::net::TcpStream::connect("http://localhost:9090").await.unwrap();
            let io = TokioIo::new(stream);

            // perform TCP handshake
            let (mut sender, conn) = hyper::client::conn::http2::handshake(executor, io).await.unwrap();
            tokio::task::spawn(async move {
                if let Err(err) = conn.await {
                    tracing::error!("channel closed");
                }

                // tokio::io::copy_bidirectional(&mut sender, &mut incoming).await
            });

            sender.send_request(Request::new(body)).await
        }
        _ => {
            eprintln!("called");
            let response = Response::builder().body(body);
            Ok::<_, hyper::Error>(response.unwrap())
        }
    }

    // let response = Response::builder().body(Full::new(Bytes::from("Hello World"))).unwrap();
    // Ok(response)
}

fn parse_grpc_path(path: &str) -> (Option<&str>, Option<&str>) {
    let mut split = path.split('/').skip(1);
    let service_name = split.next();
    let method_name = split.next();

    (service_name, method_name)
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.