Confusion about using futures_channel::Sender vs UnboundedSender

I created a web socket server in Rust, using a server example from tokio-tungstenite. The code below is a slight modification to it to try to mimic an application that does some processing and then streams the data to the connected peers. You can ignore the fact that the server will also echo messages received to the peers as well. I removed this from my real application.

This sample code just sends a message to all of the peers every 5 seconds. The difficult I'm having is trying do deal with slow clients. If a client is very slow at receiving messages, the memory usage will go up as messages backup using UnboundedSender.

I thought to use the regular Sender from futures_channel and limit the amount of data buffered for each client. However, the Sender methods require the Sender reference to be mutable, which doesn't seem possible in my example.

I'm open to other options, but I'm at a loss at the moment what my other options are to either drop slow clients or limit the amount of back-pressure allowed.

use std::{
    collections::HashMap,
    env,
    io::Error as IoError,
    net::SocketAddr,
    sync::{Arc, Mutex},
};
use std::task::Poll;

use futures_channel::mpsc::{channel, unbounded, Sender, UnboundedSender};
use futures_core::task::{Context};
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};

use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::tungstenite::protocol::Message;

type Tx = Sender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;

async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr) {
    println!("Incoming TCP connection from: {}", addr);

    let ws_stream = tokio_tungstenite::accept_async(raw_stream)
        .await
        .expect("Error during the websocket handshake occurred");
    println!("WebSocket connection established: {}", addr);

    // Insert the write part of this peer to the peer map.
    let (tx, rx) = channel(1000);
    peer_map.lock().unwrap().insert(addr, tx);

    let (outgoing, incoming) = ws_stream.split();

    let broadcast_incoming = incoming.try_for_each(|msg| {
        println!("Received a message from {}: {}", addr, msg.to_text().unwrap());
        let peers = peer_map.lock().unwrap();

        // We want to broadcast the message to everyone except ourselves.
        let broadcast_recipients =
            peers.iter().filter(|(peer_addr, _)| peer_addr != &&addr).map(|(_, ws_sink)| ws_sink);

        for recp in broadcast_recipients {
            recp.unbounded_send(msg.clone()).unwrap();
        }

        future::ok(())
    });

    let receive_from_others = rx.map(Ok).forward(outgoing);

    pin_mut!(broadcast_incoming, receive_from_others);
    future::select(broadcast_incoming, receive_from_others).await;

    println!("{} disconnected", &addr);
    peer_map.lock().unwrap().remove(&addr);
}

#[tokio::main]
async fn main() -> Result<(), IoError> {
    let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());

    let state = PeerMap::new(Mutex::new(HashMap::new()));

    // Create the event loop and TCP listener we'll accept connections on.
    let try_socket = TcpListener::bind(&addr).await;
    let listener = try_socket.expect("Failed to bind");
    println!("Listening on: {}", addr);

    // Create a thread that sends random data to all the peers every few seconds.
    let state2 = state.clone();
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
        loop {
            interval.tick().await;
            let peers = state2.lock().unwrap();
            let peers = peers.iter().map(|(_, ws_sink)| ws_sink);
            for recp in peers {
                let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
                match recp.poll_ready(&mut ctx) {
                    Poll::Ready(Ok(_)) => {
                        recp.start_send(Message::Text("Hello from server".to_string())).unwrap();
                    },
                    Poll::Pending => {
                        println!("Peer is not ready");
                        continue;
                    },
                    Poll::Ready(Err(_)) => {
                        println!("Peer dropped.");
                        continue;
                    },
                }
            }
        }
    });

    // Let's spawn the handling of each connection in a separate task.
    while let Ok((stream, addr)) = listener.accept().await {
        tokio::spawn(handle_connection(state.clone(), stream, addr));
    }

    Ok(())
}

Any reason not to use tokio::sync::mpsc::Sender? It's clonable alright and bounded.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.