Different port/TcpStream for Rest and Websocket?

I currently have a basic HTTP server with different routes. All the routes return basic HTTP responses but one route is required to upgrade the http request to a websocket.

Currently I was trying to achieve this by passing the stream to the route and trying to upgrade it there using connect_async.

The problem is that connect_async moves the stream thus making it unaccessible for the rest of the application.

Simplified code:

use tokio_tungstenite::{ accept_async, tungstenite::protocol::Message, WebSocketStream, MaybeTlsStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_tungstenite::{ accept_async, tungstenite::protocol::Message, WebSocketStream, MaybeTlsStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use futures_util::{StreamExt, SinkExt};

#[tokio::main]
async fn main() {
    // Start server.
    let server = TcpListener::bind("127.0.0.1:8080").await.expect("Failed to bind to 127.0.0.1:8080");
    println!("Listening on 127.0.0.1:8080...");
   
    // Spawn process to handle incoming connections.
    loop {
        let (mut stream, _) = server.accept().await.unwrap();
        
        execute_route("rofl", stream).await;
    }
}

async fn process_request(route_name: &str, mut stream: TcpStream) {

    let response = execute_route(route_name, stream).await;

    // BECAUSE WE HAVE PASSED stream TO THE ROUTE FUNCTIONS OF FEW LINES ABOVE BECAUSE
    // ONE OF THEM MIGHT WANT TO UPGRADE IT TO A WEBSOCKET, WE CAN'T USE stream
    // ANYMORE TO WRITE A REPLY TOO... :(
  
    // Removing this part makes the code work again.
    // Return response
    stream.write_all(response.to_string().as_bytes()).await.unwrap();
    // Consume stream.
    let mut buffer = [0; 4096];
    stream.read(&mut buffer).await.unwrap();
}

async fn execute_route(name: &str, mut stream: TcpStream) -> String {
    let response = match name {
        "route1" => route1(&mut stream).await,
        "route2" => route2(&mut stream).await,
        _ => { return "404".into(); }
    };

    response
}

// Some routes
// ALL route functions need to have the same signature.
async fn route1(stream: &mut TcpStream) -> String {
    "Hello.".to_string()
}

async fn route2(stream: &mut TcpStream) -> String {
    // do some things with the WebSocketStream
    // Create websocket stream.
    let ws_stream = accept_async(stream).await.expect("Failed to accept");
    let (mut ws_sender, mut ws_receiver) = ws_stream.split();
    
    "Response.".to_string()
}
use tokio::net::{TcpListener, TcpStream};
use futures_util::{StreamExt, SinkExt};

#[tokio::main]
async fn main() {
    // Start server.
    let server = TcpListener::bind("127.0.0.1:8080").await.expect("Failed to bind to 127.0.0.1:8080");
    println!("Listening on 127.0.0.1:8080...");
   
    // Spawn process to handle incoming connections.
    loop {
        let (mut stream, _) = server.accept().await.unwrap();
        
        execute_route("rofl", stream).await;
    }
}

async fn process_request(route_name: &str, mut stream: TcpStream) {

    let response = execute_route(route_name, stream).await;

    // BECAUSE WE HAVE PASSED stream TO THE ROUTE FUNCTIONS OF FEW LINES ABOVE BECAUSE
    // ONE OF THEM MIGHT WANT TO UPGRADE IT TO A WEBSOCKET, WE CAN'T USE stream
    // ANYMORE TO WRITE A REPLY TOO... :(
  
    // Removing this part makes the code work again.
    // Return response
    stream.write_all(response.to_string().as_bytes()).await.unwrap();
    // Consume stream.
    let mut buffer = [0; 4096];
    stream.read(&mut buffer).await.unwrap();
}

async fn execute_route(name: &str, mut stream: TcpStream) -> String {
    let response = match name {
        "route1" => route1(&mut stream).await,
        "route2" => route2(&mut stream).await,
        _ => { return "404".into(); }
    };

    response
}

// Some routes
// ALL route functions need to have the same signature.
async fn route1(stream: &mut TcpStream) -> String {
    "Hello.".to_string()
}

async fn route2(stream: &mut TcpStream) -> String {
    // do some things with the WebSocketStream
    // Create websocket stream.
    let ws_stream = accept_async(stream).await.expect("Failed to accept");
    let (mut ws_sender, mut ws_receiver) = ws_stream.split();
    
    "Response.".to_string()
}

error[E0382]: borrow of moved value: `stream`
  --> src/algotester2.rs:30:5
   |
20 | async fn process_request(route_name: &str, mut stream: T...
   |                                            ---------- move occurs because `stream` has type `tokio::net::TcpStream`, which does not implement the `Copy` trait
21 |
22 |     let response = execute_route(route_name, stream).await;
   |                                              ------ value moved here
...
30 |     stream.write_all(response.to_string().as_bytes()).aw...
   |     ^^^^^^ value borrowed here after move

I have been stuck on this for multiple days. The simplified code can be restructured to make it work (take away the function process_request and handle writing response to stream in execute_route) but in my actual code this is unthinkable.

Now I have got the idea to just listen for streams for websockets on a different stream.

    // Server for HTTP/REST API requests
    let server = TcpListener::bind("127.0.0.1:8080").await.expect("Failed to bind to 127.0.0.1:8080");
    println!("Listening on 127.0.0.1:8080...");
    
    // Server for Websocket API requests
    let ws_server = TcpListener::bind("127.0.0.1:8081").await.expect("Failed to bind to 127.0.0.1:8080");
    println!("Listening on 127.0.0.1:8081...");

Is this a good solution?

You may want to look at how other web frameworks handle this.

Axum has a similar structure to what you're working with so that could be helpful

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.