Dereference a struct inside an Arc without using a Mutex

Hello everyone!
I'm new to Rust and in order to learn more about Rust's concurrency model, I have been trying to write a simple chat application that runs over TCP and concurrently processes clients.
The way that I'm doing this by storing the client TCP connections (or rather a clone of those) in a Vec and then broadcasting to all clients whenever someone sends a message.
In doing so, I'm running into an issue. The issue is that the compiler won't allow me to dereference a value inside an Arc. After some forum reading, it seems like the usual recommendation is to just wrap the value inside the Arc in a Mutex. However,in my case, that won't really solve the concurrency issue at hand, as it seems to lock the entire client handling logic behind a mutex.

I was wondering if someone can help me with this. Please find the code below - note that it won't really compile. Here is also a playground link.

use std::env;
use std::io::{Read, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::thread;

const DEFAULT_PORT: u16 = 12800;

fn main() {
    let port_num = get_port().unwrap_or_else(|_e| {
        println!(
            "Program started without a port number, starting with default port {}",
            DEFAULT_PORT
        );
        DEFAULT_PORT
    });
    let server = Server::new(port_num);
    let handles = server.start();
    for handle in handles {
        handle.join().unwrap();
    }
}

//TODO: Replace this with a flag parser instead
fn get_port() -> Result<u16, &'static str> {
    let args: Vec<_> = env::args().collect();
    if args.len() < 2 {
        return Err("Invalid number of arguments");
    }
    let port_number = &args[1];
    let parse_result = port_number.parse::<u16>();
    match parse_result {
        Ok(port) => Ok(port),
        Err(_err) => Err("Invalid argument"),
    }
}

const EOT: u8 = 0x4;

struct ServerInner {
    clients: Mutex<Vec<TcpStream>>,
}

pub struct Server {
    port: u16,
    inner: Arc<ServerInner>, //If I wrap the ServerInner in a Mutex, it would compile but won't solve my concurrency requirement
}

impl Server {
    pub fn new(port: u16) -> Server {
        Server {
            port: port,
            inner: Arc::new(ServerInner {
                clients: Mutex::new(vec![]),
            }),
        }
    }
    pub fn start(&self) -> Vec<thread::JoinHandle<()>> {
        let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), self.port);
        let listener = TcpListener::bind(socket).unwrap(); //TODO: better error handling
        let mut handles: Vec<thread::JoinHandle<()>> = vec![];
        for stream_res in listener.incoming() {
            match stream_res {
                Ok(stream) => {
                    println!("Handling the client in a new thread");
                    let local_self = self.inner.clone();
                    let handle = thread::spawn(move || {
                        if let Err(err) = local_self.handle_client(&stream) { //I do not want to concurrently process requests from many clients. If I wrap the inner server in a Mutex just to get DerefMut it defeats the concurrency purpose
                            eprintln!("Failed to handle the client {}", err);
                        }
                    });
                    handles.push(handle);
                }
                Err(err) => {
                    eprintln!("Unable to process the TCP connection {}", err)
                }
            }
        }
        return handles;
    }
}

impl ServerInner {
    fn handle_client(&mut self, mut conn: &TcpStream) -> Result<(), &str> {
        self.clients.lock().unwrap().push(conn.try_clone().unwrap());
        loop {
            let mut read_buffer = [0; 1024]; //1 mb buffer
            match conn.read(&mut read_buffer) {
                Ok(nbytes) => {
                    println!("read {} bytes from the client", nbytes);
                    if nbytes == 0 {
                        println!("client exited - closing connection");
                        return Ok(());
                    }
                    if nbytes == 1 && read_buffer[0] == EOT {
                        println!("client exited - end of transmission recieved");
                        return Ok(());
                    }
                    for c in self.clients.lock().unwrap().iter_mut() {
                        c.write(&read_buffer[0..nbytes]).unwrap();
                    }
                }
                Err(_err) => return Err("Error reading from the client"),
            }
        }
    }
}

(Playground)

handle_clients should take a shared &self reference instead of an exclusive &mut one. The ServerInner’s Mutex is an interior mutability primitive: it lets you safely get an &mut from the & that you pass in.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.