I'm trying to develop a function that will go through a list of connected websocket clients and poll them. For example: poll client 1, wait for response, process it -> poll client 2, wait for response... -> final client.
The issue I'm encountering currently with the following code is that with one connected client the unbounded_send
doesn't seem to be firing. However, with two connecting clients it's only polling one client each loop. I did notice that the previous sends that didn't seem to be firing with one connected user all seem to finally go through when the second is connected.
Here's the code I have, in particular it's the take_turn
function:
use std::{env, io::Error};
use std::io;
use std::io::Write;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use text_io::read;
// use futures_util::StreamExt;
use log::info;
use tokio::net::{TcpListener, TcpStream};
use futures::{
channel::mpsc::{unbounded, UnboundedSender},
future, pin_mut,
stream::TryStreamExt,
StreamExt,
};
use tungstenite::protocol::Message;
type Tx = UnboundedSender<Message>;
#[tokio::main]
async fn main() -> Result<(), Error> {
let mut users = Arc::new(Mutex::new(HashMap::new()));
let _ = env_logger::try_init();
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let mut listener = try_socket.expect("Failed to bind");
info!("Listening on: {}", addr);
tokio::spawn(take_turn(users.clone()));
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(accept_connection(stream, users.clone()));
}
Ok(())
}
async fn take_turn(users: Arc<Mutex<HashMap<String, Tx>>>) {
loop {
// println!("-------");
println!("\n");
print!("\r");
std::io::stdout().flush().unwrap();
println!("poll users: ");
std::io::stdout().flush().unwrap();
let cont: String = read!();
if cont == "y".to_string() {
for (name, tx) in users.lock().unwrap().iter() {
println!("sending to: {}", name);
tx.unbounded_send(Message::Text("guess".to_string()));
}
}
}
}
async fn accept_connection(stream: TcpStream, users: Arc<Mutex<HashMap<String, Tx >>>) {
let addr = stream
.peer_addr()
.expect("connected streams should have a peer address");
info!("Peer address: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(stream)
.await
.expect("Error during the websocket handshake occurred");
info!("New WebSocket connection: {}", addr);
let (tx, rx) = unbounded();
let (write, read) = ws_stream.split();
let mut first_msg :bool = true;
let mut name = String::new();
let broadcast_incoming = read.try_for_each(|msg| {
println!("{}: {}", addr, msg.to_text().unwrap());
if first_msg {
users.lock().unwrap().insert(msg.to_string(), tx.clone());
name = msg.to_string();
first_msg = false;
}
else {
let users = users.lock().unwrap();
let recipients = users.iter().filter(|(client_name, _)| client_name != &&name)
.map(|(_, ws_sink)| ws_sink);
for recp in recipients {
recp.unbounded_send(msg.clone()).unwrap();
}
}
future::ok(())
});
let receive_from_others = rx.map(Ok).forward(write);
pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;
println!("{} disconnected", &addr);
users.lock().unwrap().remove(&addr.to_string());
}