Broadcast server - what is the best way?

Hello!
I try to code a simple TLS server. The server has to send a message to all clients (TlsStream) after reading a message from the client.
I have shared object - Arc::new(Mutex::new(HashMap::<SocketAddr, tokio_rustls::server::TlsStream<tokio::net::TcpStream>>::new()))
But I have a problem with twice mutable borrow.
What is the best solution?
Interior mutability or solution based on tokio-example?

My full example: tokio-tls-example/main.rs at borrow-problem · hanusek/tokio-tls-example · GitHub

async fn handle_connection(peer_addr: SocketAddr, 
                           tls_stream: tokio_rustls::server::TlsStream<tokio::net::TcpStream>;,
                           clients_map: Arc<Mutex<HashMap<SocketAddr, DataStream>>>) 
{
    tracing::info!("New connection: {}", peer_addr);

    let mut clients_mutex = clients_map.lock().await; 
    clients_mutex.insert(peer_addr, tls_stream);

    tracing::info!("Clients number: {:?}", clients_mutex.len() + 1);

    const BUFF_SIZE: usize = 512;
    let mut buf = vec![0; BUFF_SIZE];

    let tls_stream = clients_mutex.get_mut(&peer_addr).unwrap();

    while let Ok(n) = tls_stream.read(&mut buf).await
    {
         if n == 0 { 
            tracing::error!("No message from: {}, disconnect!", peer_addr); 
            return;
         }

         let text_msg = String::from_utf8_lossy(&buf[..n]).to_string();
         tracing::info!("recv bytes: {} {:?} from peer: {:?}", n, text_msg, peer_addr);
        
         // Send to all clients
         for (peer_addr, stream) in clients_mutex.iter_mut() 
         {
             stream.write_all(format!("New client: {:?}", peer_addr.clone()).as_str().as_bytes()).await.expect("failed to write data to socket");
             tracing::info!("send msg to peer: {:?}", peer_addr);
         }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> 
{
    tracing_subscriber::fmt().init();

    let config : ServerConfig = ... ;

    let acceptor = TlsAcceptor::from(Arc::new(config));
    let listener = TcpListener::bind("0.0.0.0:9900").await?;
    tracing::info!("listening on port {:?}", listener.local_addr()?);

    let clients_map  = Arc::new(Mutex::new(HashMap::<SocketAddr, _>::new()));

    loop {

        let (stream, peer_addr) = listener.accept().await?;
        let acceptor = acceptor.clone();
        let mut tls_stream = acceptor.accept(stream).await?;
        
        let clients_map_cloned = clients_map.clone();

        tokio::spawn(async move {
            handle_connection(peer_addr, tls_stream, clients_map_cloned).await;
        });
    }
}

log:

error[E0499]: cannot borrow `clients_mutex` as mutable more than once at a time
   --> src/main.rs:125:37
    |
112 |     let tls_stream = clients_mutex.get_mut(&peer_addr).unwrap();
    |                      --------------------------------- first mutable borrow occurs here
113 |
114 |     while let Ok(n) = tls_stream.read(&mut buf).await
    |                       ------------------------- first borrow later used here
...
125 |          for (peer_addr, stream) in clients_mutex.iter_mut() 
    |                                     ^^^^^^^^^^^^^^^^^^^^^^^^ second mutable borrow occurs here

Used libraries:

Don't put the new stream in the Mutex until after you've sent all the messages. This would also avoid sending the "new client" message to the new stream, which your code currently would (if it compiled)

What do you mean?

async fn handle_connection(peer_addr: SocketAddr, 
                           mut tls_stream: tokio_rustls::server::TlsStream<tokio::net::TcpStream>,
                           clients_map: Arc<Mutex<HashMap<SocketAddr, tokio_rustls::server::TlsStream<tokio::net::TcpStream>>>>) 
{
    tracing::info!("New connection: {}", peer_addr);
        
    const BUFF_SIZE: usize = 512;
    let mut buf = vec![0; BUFF_SIZE];
    while let Ok(n) = tls_stream.read(&mut buf).await
    {
        if n == 0 { 
            tracing::error!("No message from: {}, disconnect!", peer_addr); 
            return;
        }

        let text_msg = String::from_utf8_lossy(&buf[..n]).to_string();
        tracing::info!("recv bytes: {} {:?} from peer: {:?}", n, text_msg, peer_addr);
    }

    {
        let mut clients_mutex = clients_map.lock().await; 

        // Send to all clients
        for (peer_addr, stream) in clients_mutex.iter_mut() 
        {
            stream.write_all(format!("New client: {:?}", peer_addr.clone()).as_str().as_bytes()).await.expect("failed to write data to socket");
            tracing::info!("send msg to peer: {:?}", peer_addr);
        }

        clients_mutex.insert(peer_addr, tls_stream);

        tracing::info!("Clients number: {:?}", clients_mutex.len() + 1);
    }
}

That? It's not working because the client sends all the time data to the server. And while loop never ends.

Oh I didn't notice the outer loop. That's not going to work because you're keeping the whole mutex locked for as long as the connection is running then. You may want to look at split in tokio::io - Rust, which allows you to read and write with separate handles to a stream. That should help with the ownership issues.

You can have the read task take ownership of the read handle, and then store the write handle in the map. Since you only need the write handles to respond to messages you don't end up needing to double borrow the map.

async fn handle_connection(
    peer_addr: SocketAddr,
    mut tls_stream: TlsStream<TcpStream>,
    clients_map: Arc<Mutex<HashMap<SocketAddr, tokio::io::WriteHalf<TlsStream<TcpStream>>>>>,
) {
    tracing::info!("New connection: {}", peer_addr);

    let (mut read, write) = tokio::io::split(tls_stream);
    let mut clients_mutex = clients_map.lock().await;
    clients_mutex.insert(peer_addr, write);

    tracing::info!("Clients number: {:?}", clients_mutex.len() + 1);

    const BUFF_SIZE: usize = 512;
    let mut buf = vec![0; BUFF_SIZE];

    while let Ok(n) = read.read(&mut buf).await {
        if n == 0 {
            tracing::error!("No message from: {}, disconnect!", peer_addr);
            return;
        }

        let text_msg = String::from_utf8_lossy(&buf[..n]).to_string();
        tracing::info!(
            "recv bytes: {} {:?} from peer: {:?}",
            n,
            text_msg,
            peer_addr
        );

        // Send to all clients
        for (peer_addr, stream) in clients_mutex.iter_mut() {
            stream
                .write_all(
                    format!("New client: {:?}", peer_addr.clone())
                        .as_str()
                        .as_bytes(),
                )
                .await
                .expect("failed to write data to socket");
            tracing::info!("send msg to peer: {:?}", peer_addr);
        }
    }
}

Thanks for your reply. But your solution sends a message about new client all the time.
The program has to send once this information and process messages from client all the time (while loop).

I just made the code you posted build. You can trivially fix the multiple new client messages by moving that part out of the while loop

There is also the problem that you lock the mutex outside de while loop. It means that you'll then treat only first connection, all other will simply waits for the lock to be released.

You should only lock for inserting new element in the hashmap and just around your for loop that iterates once over all clients for sending a message and nothing more.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.