AsyncReadExt::read makes connection fail

Hello

I am trying to accept a websocket connection on my server. The problem I kept encountering is that the websocket kept saying "Handshake incomplete". I did not find the problem until now. I made a simplified version of my code.

use futures_util::{SinkExt, StreamExt};

use log::*;
use std::{net::SocketAddr, time::Duration};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{
    accept_async,
    tungstenite::{ Message, Result},
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use std::error::Error;

async fn process(stream: &mut TcpStream) -> Result<(), Box<dyn Error>> {
    // Commenting this block out makes the connection work again
    let mut buffer = [0; 1024];
    if let Err(e) = stream.read(&mut buffer).await {
        eprintln!("Error: {}", e);


        return Ok(());
    }
    /////////////////////////////////////////////////////////

    goto(stream).await;
    Ok(())
}

async fn goto(stream: &mut TcpStream) -> Result<(), String> {
    websocket(stream).await
}

async fn websocket(stream: &mut TcpStream) -> Result<(), String> {
    let ws_stream = accept_async(stream).await.expect("Failed to accept");
    let (mut ws_sender, mut ws_receiver) = ws_stream.split();
    let mut interval = tokio::time::interval(Duration::from_millis(1000));

    // Echo incoming WebSocket messages and send a message periodically every second.
    loop {
        tokio::select! {
            msg = ws_receiver.next() => {
                match msg {
                    Some(msg) => {
                        let msg = msg.unwrap();
                        if msg.is_text() ||msg.is_binary() {
                            ws_sender.send(msg).await.unwrap();
                        } else if msg.is_close() {
                            break;
                        }
                    }
                    None => break,
                }
            }
            _ = interval.tick() => {
                ws_sender.send(Message::Text("tick".to_owned())).await.unwrap();
            }
        }
    }

    Ok(())

}


#[tokio::main]
async fn main() {

    let addr = "127.0.0.1:9002";
    let listener = TcpListener::bind(&addr).await.expect("Can't listen");
    info!("Listening on: {}", addr);

    loop {
        let (mut stream, _) = listener.accept().await.unwrap();

        tokio::spawn(async move {
            if let Err(e) = process(&mut stream).await {
                println!("Error {}", e);
            }
        });
    }


}

The piece of code that gives a problem is stream.read(). Removing that piece of code makes the connection and server work perfectly fine.

How can I solve this?

It seems like a solution is to use stream.peek() instead of stream.read(). The last one consumes the data resulting in the websocket not having a valid TcpStream to initiate a websocket with. peek() just reads the data without consuming it.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.