Hello everyone!
I'm new to Rust and in order to learn more about Rust's concurrency model, I have been trying to write a simple chat application that runs over TCP and concurrently processes clients.
The way that I'm doing this by storing the client TCP connections (or rather a clone of those) in a Vec
and then broadcasting to all clients whenever someone sends a message.
In doing so, I'm running into an issue. The issue is that the compiler won't allow me to dereference a value inside an Arc
. After some forum reading, it seems like the usual recommendation is to just wrap the value inside the Arc
in a Mutex
. However,in my case, that won't really solve the concurrency issue at hand, as it seems to lock the entire client handling logic behind a mutex.
I was wondering if someone can help me with this. Please find the code below - note that it won't really compile. Here is also a playground link.
use std::env;
use std::io::{Read, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::thread;
const DEFAULT_PORT: u16 = 12800;
fn main() {
let port_num = get_port().unwrap_or_else(|_e| {
println!(
"Program started without a port number, starting with default port {}",
DEFAULT_PORT
);
DEFAULT_PORT
});
let server = Server::new(port_num);
let handles = server.start();
for handle in handles {
handle.join().unwrap();
}
}
//TODO: Replace this with a flag parser instead
fn get_port() -> Result<u16, &'static str> {
let args: Vec<_> = env::args().collect();
if args.len() < 2 {
return Err("Invalid number of arguments");
}
let port_number = &args[1];
let parse_result = port_number.parse::<u16>();
match parse_result {
Ok(port) => Ok(port),
Err(_err) => Err("Invalid argument"),
}
}
const EOT: u8 = 0x4;
struct ServerInner {
clients: Mutex<Vec<TcpStream>>,
}
pub struct Server {
port: u16,
inner: Arc<ServerInner>, //If I wrap the ServerInner in a Mutex, it would compile but won't solve my concurrency requirement
}
impl Server {
pub fn new(port: u16) -> Server {
Server {
port: port,
inner: Arc::new(ServerInner {
clients: Mutex::new(vec![]),
}),
}
}
pub fn start(&self) -> Vec<thread::JoinHandle<()>> {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), self.port);
let listener = TcpListener::bind(socket).unwrap(); //TODO: better error handling
let mut handles: Vec<thread::JoinHandle<()>> = vec![];
for stream_res in listener.incoming() {
match stream_res {
Ok(stream) => {
println!("Handling the client in a new thread");
let local_self = self.inner.clone();
let handle = thread::spawn(move || {
if let Err(err) = local_self.handle_client(&stream) { //I do not want to concurrently process requests from many clients. If I wrap the inner server in a Mutex just to get DerefMut it defeats the concurrency purpose
eprintln!("Failed to handle the client {}", err);
}
});
handles.push(handle);
}
Err(err) => {
eprintln!("Unable to process the TCP connection {}", err)
}
}
}
return handles;
}
}
impl ServerInner {
fn handle_client(&mut self, mut conn: &TcpStream) -> Result<(), &str> {
self.clients.lock().unwrap().push(conn.try_clone().unwrap());
loop {
let mut read_buffer = [0; 1024]; //1 mb buffer
match conn.read(&mut read_buffer) {
Ok(nbytes) => {
println!("read {} bytes from the client", nbytes);
if nbytes == 0 {
println!("client exited - closing connection");
return Ok(());
}
if nbytes == 1 && read_buffer[0] == EOT {
println!("client exited - end of transmission recieved");
return Ok(());
}
for c in self.clients.lock().unwrap().iter_mut() {
c.write(&read_buffer[0..nbytes]).unwrap();
}
}
Err(_err) => return Err("Error reading from the client"),
}
}
}
}