Async Polling Clients (w/ Tokio-Tungstenite, WebSockets )

I'm trying to develop a function that will go through a list of connected websocket clients and poll them. For example: poll client 1, wait for response, process it -> poll client 2, wait for response... -> final client.

The issue I'm encountering currently with the following code is that with one connected client the unbounded_send doesn't seem to be firing. However, with two connecting clients it's only polling one client each loop. I did notice that the previous sends that didn't seem to be firing with one connected user all seem to finally go through when the second is connected.

Here's the code I have, in particular it's the take_turn function:

use std::{env, io::Error};
use std::io;
use std::io::Write;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use text_io::read;

// use futures_util::StreamExt;
use log::info;
use tokio::net::{TcpListener, TcpStream};

use futures::{
    channel::mpsc::{unbounded, UnboundedSender},
    future, pin_mut,
    stream::TryStreamExt,
    StreamExt,
};
use tungstenite::protocol::Message;

type Tx = UnboundedSender<Message>;

#[tokio::main]
async fn main() -> Result<(), Error> {

    let mut users = Arc::new(Mutex::new(HashMap::new()));

    let _ = env_logger::try_init();
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:8080".to_string());

    // Create the event loop and TCP listener we'll accept connections on.
    let try_socket = TcpListener::bind(&addr).await;
    let mut listener = try_socket.expect("Failed to bind");
    info!("Listening on: {}", addr);

    tokio::spawn(take_turn(users.clone()));

    while let Ok((stream, _)) = listener.accept().await {
        tokio::spawn(accept_connection(stream, users.clone()));
    }




    Ok(())
}

async fn take_turn(users: Arc<Mutex<HashMap<String, Tx>>>) {

    loop {
        // println!("-------");
        println!("\n");
        print!("\r");
        std::io::stdout().flush().unwrap();
        println!("poll users: ");
        std::io::stdout().flush().unwrap();
        let cont: String = read!();

        if cont == "y".to_string() {
            for (name, tx) in users.lock().unwrap().iter() {
                println!("sending to: {}", name);
                tx.unbounded_send(Message::Text("guess".to_string()));
            }

        }
    }
}

async fn accept_connection(stream: TcpStream, users: Arc<Mutex<HashMap<String, Tx >>>) {
    let addr = stream
        .peer_addr()
        .expect("connected streams should have a peer address");
    info!("Peer address: {}", addr);

    let ws_stream = tokio_tungstenite::accept_async(stream)
        .await
        .expect("Error during the websocket handshake occurred");

    info!("New WebSocket connection: {}", addr);

    let (tx, rx) = unbounded();
    let (write, read) = ws_stream.split();

    let mut first_msg :bool = true;
    let mut name = String::new();

    let broadcast_incoming = read.try_for_each(|msg| {

        println!("{}: {}", addr, msg.to_text().unwrap());

        if first_msg {
            users.lock().unwrap().insert(msg.to_string(), tx.clone());
            name = msg.to_string();
            first_msg = false;
        }

        else {
            let users = users.lock().unwrap();
            let recipients = users.iter().filter(|(client_name, _)| client_name != &&name)
                                  .map(|(_, ws_sink)| ws_sink);
            for recp in recipients {
                recp.unbounded_send(msg.clone()).unwrap();
            }
        }
        future::ok(())
    });

    let receive_from_others = rx.map(Ok).forward(write);
    pin_mut!(broadcast_incoming, receive_from_others);
    future::select(broadcast_incoming, receive_from_others).await;

    println!("{} disconnected", &addr);
    users.lock().unwrap().remove(&addr.to_string());
}
1 Like

Do not block the thread in async code. Take a look in this line:

let cont: String = read!();

This line is not instant because it waits for the user. Additionally it has no .await, thus it is blocking the thread, which will prevent other tasks from executing. Because of this, you should use std::thread::spawn to put its own thread instead of running it on the Tokio executor. Alternatively, you could use async IO, but that is not recommended for interactive terminal IO. Please read the section on CPU-bound tasks and blocking code in Tokio's documentation.

I don't immediately see any other issues, but blocking the thread can definitely cause havoc.

1 Like

Hey, thanks for the quick response I've been stuck on this one. I'm actually okay if it blocks right there. I was wanting it to wait for my input before trying to send to clients. The issue is that the for loop isn't working, it just sends to the first one and stops. Could this read!() be affecting that? I'll work on moving it to a std::thread::spawn and see if that changes it. Thanks for the recommendation.

let mut users2 = users.clone();
std::thread::spawn(async move || take_turn(users2).await);

Added this but now it doesn't look like the take_turn function even runs. Weird.

You should make take_turn a non-async function to run it outside Tokio. I recommend not using nightly or the unstable async closure syntax.

What you currently wrote does the following:

  1. Spawns a thread.
  2. This thread calls the async closure, which immediately returns a future object.
  3. Since the closure has returned, the thread exits, and the JoinHandle provides the future object as the return value of the thread.

Although you ignored the JoinHandle in this case. Since the future object was never awaited, the future never ran.

2 Likes

aha! that worked! thank you!

1 Like

Did it also fix the issue with accept_connection not properly working?

I believe accept_connection was always working? Well maybe not since the unbounded_send in take_turn was sending messages and accept_connection was not broadcasting them back out. So yeah, it is working now. Next challenge is to not send out another poll before waiting for one to come in. But I think that's just a problem of state management.

1 Like

Sounds good! But yeah, blocking the thread is a very bad idea, so always make sure that your async fn never spends a long time between .awaits.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.