Keep WebSocket connection open?

Hello

I have the following code to make a simple WebSocket-client:

use actix_rt::System;
use awc::{Client, ws::{self, Frame, Message}};
use bytes::Bytes;
use futures_util::stream::StreamExt;
use futures_util::sink::SinkExt;

#[actix::main]
async fn main() {
    // Connect to the WebSocket server
    let (response, mut framed) = awc::Client::new()
        .ws("ws://127.0.0.1:8080/ws") // Replace with your WebSocket server URL
        .connect()
        .await
        .expect("Failed to connect to WebSocket server");

    println!("Connected to: {:?}", response);

    // Send a message to the server
    let message = ws::Message::Text("Hello, WebSocket Server!".into());
    framed.send(message).await.expect("Failed to send message");

    // Receive and print messages from the server
    while let Some(Ok(frame)) = framed.next().await {
        match frame {
            ws::Frame::Text(text) => {
                println!("Received: {:?}", text);
            }
            _ => (),
        }
    }
}

It seems to be working. I get the "Connected to"-output. But the program automatically terminates after a few seconds. How to keep the connection open and keep waiting for a response?

You need to play ping pong with the server to keep the connection alive: Writing WebSocket servers - Web APIs | MDN.

1 Like

I tried this code now:

use actix_rt::System;
use awc::{
    ws::{self, Frame, Message},
    Client,
};
use bytes::Bytes;
use futures_util::stream::StreamExt;
use futures_util::sink::SinkExt;
use std::time::Duration;
use tokio::time::interval;

#[actix_rt::main]
async fn main() {
    // Connect to the WebSocket server
    let (response, mut framed) = Client::new()
        .ws("ws://127.0.0.1:8080/ws") // Replace with your WebSocket server URL
        .connect()
        .await
        .expect("Failed to connect to WebSocket server");

    println!("Connected to: {:?}", response);

    // Send a message to the server
    let message = ws::Message::Text("Hello, WebSocket Server!".into());
    framed.send(message).await.expect("Failed to send message");

    // Create a periodic ping interval
    let mut ping_interval = interval(Duration::from_secs(10)); // Send a ping every 10 seconds

    // Receive and print messages from the server
    while let Some(frame) = tokio::select! {
        // Wait for either a received frame or the ping interval
        Some(frame) = framed.next() => {Some(frame)},
        _ = ping_interval.tick() => { println!("none"); None },
    } {
        match frame {
            Ok(ws::Frame::Text(text)) => {
                println!("Received: {:?}", text);
            }
            Ok(ws::Frame::Pong(_)) => {
                println!("Received Pong from server");
            }
            _ => {
                println!("No matching frame");
                ()
            },
        }
    }
}

But the while-loop instantly ends because frame is None. Why? Should I return a frame from the server?

Working code using tokio-tungstenite.

use std::env;

use futures_util::{future, pin_mut, StreamExt};
use futures::SinkExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
   // let connect_addr =
      //  env::args().nth(1).unwrap_or_else(|| panic!("this program requires at least one argument"));

    let url = url::Url::parse("wss://127.0.0.1:8080/ws").unwrap();

    let (stdin_tx, mut stdin_rx) = mpsc::unbounded_channel();
    tokio::spawn(read_stdin(stdin_tx));

    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
    println!("WebSocket handshake has been successfully completed");

    let (mut write, mut read) = ws_stream.split();

    let stdin_to_ws = async {
        while let Some(message) = stdin_rx.recv().await {
            write.send(message).await.expect("Failed to send message to WebSocket");
        }
    };

    let ws_to_stdout = async {
        while let Some(message) = read.next().await {
            println!("{:?}", message);
            let data = message.unwrap().into_data();
            tokio::io::stdout().write_all(&data).await.expect("Failed to write to stdout");
        }
    };

    tokio::select! {
        _ = stdin_to_ws => (),
        _ = ws_to_stdout => (),
    };
}

// Our helper method which will read data from stdin and send it along the
// sender provided.
async fn read_stdin(tx: mpsc::UnboundedSender<Message>) {
    let mut stdin = tokio::io::stdin();
    loop {
        let mut buf = vec![0; 1024];
        let n = match stdin.read(&mut buf).await {
            Err(_) | Ok(0) => break,
            Ok(n) => n,
        };
        buf.truncate(n);
        tx.send(Message::text(String::from_utf8(buf).unwrap())).expect("Failed to send message to WebSocket");
    }
}


Cargo.toml

[dependencies]
futures-util = "*"
futures = "*"
bytes = "1.5.0"
tokio = { version = "*", features = ["full"] }
tokio-tungstenite = { version = "*", features = ["native-tls"] }
lazy_static="*"
url = "*"

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.