[Solved] Stuck with a simple asynchronous server app

Hi there! I am a relatively new Rust user, although I got my hands on a couple of projects using the language and I am enjoying it very much so far. I am currently investigating asynchronous programming with futures-preview = "0.3.0-alpha.16" and runtime = "0.3.0-alpha.6". It feels very cool, but I stumbled upon some issue that I am unable to solve.

To go a bit beyond the basic examples I wanted to create an app that:

  1. Accepts TCP connections on a given port;
  2. Broadcasts all the data received from any connection to all active connections.

This took me a while to hack together, but here is the working code:

#![feature(async_await)]
#![feature(async_closure)]

use futures::{
    prelude::*,
    stream::select,
    io::{ReadHalf, WriteHalf},
    channel::mpsc::{unbounded, UnboundedSender}
};

use runtime::net::{TcpListener, TcpStream};

use std::{
    io,
    net::SocketAddr,
    collections::HashMap,
};

async fn read_stream(
    addr: SocketAddr,
    mut reader: ReadHalf<TcpStream>,
    sender: UnboundedSender<(SocketAddr, io::Result<Box<[u8]>>)>
) {
    loop {
        let mut buffer: Vec<u8> = vec![0; 1024];
        match reader.read(&mut buffer).await {
            Ok(len) => {
                buffer.truncate(len);
                sender.unbounded_send((addr, Ok(buffer.into_boxed_slice()))).expect("Channel error");
                if len == 0 {
                    return;
                }
            },
            Err(err) => {
                sender.unbounded_send((addr, Err(err))).expect("Channel error");
                return;
            }
        }
    }
}

enum Event {
    Connection(io::Result<TcpStream>),
    Message(SocketAddr, io::Result<Box<[u8]>>),
}

#[runtime::main]
async fn main() -> std::io::Result<()> {
    let mut listener = TcpListener::bind("127.0.0.1:8080")?;
    eprintln!("Listening on {}", listener.local_addr()?);

    let mut writers: HashMap<SocketAddr, WriteHalf<TcpStream>> = HashMap::new();
    let (sender, receiver) = unbounded();

    let connections = listener.incoming().map(|maybe_stream| Event::Connection(maybe_stream));
    let messages = receiver.map(|(addr, maybe_message)| Event::Message(addr, maybe_message));
    let mut events = select(connections, messages);

    loop {
        match events.next().await {
            Some(Event::Connection(Ok(stream))) => {
                let addr = stream.peer_addr().unwrap();
                eprintln!("New connection from {}", addr);

                let (reader, writer) = stream.split();
                writers.insert(addr, writer);
                runtime::spawn(read_stream(addr, reader, sender.clone()));
            },
            Some(Event::Message(addr, Ok(message))) => {
                if message.len() == 0 {
                    eprintln!("Connection closed: {}", addr);
                    writers.remove(&addr);
                } else {
                    eprintln!("Received {} bytes from {}", message.len(), addr);
                    for (&other_addr, writer) in &mut writers {
                        if addr != other_addr {
                            writer.write_all(&message).await.ok(); // Ignore errors
                        }
                    }
                }
            },
            Some(Event::Message(addr, Err(err))) => {
                eprintln!("Error reading from {}: {}", addr, err);
                writers.remove(&addr);
            },
            _ => panic!("Event error"),
        }
    }
}

It does solve the task at hand, but has a serious flaw: there is no way to drop a connection on the server side! Sure, you can close WriteHalf<TcpStream>, but this won’t prevent the client to keep feeding you data (technically, closing WriteHalf does not close socket). It is probably solvable via adding a channel from the server loop to every reading task, so that when the connection is dropped the server loop could send a message to the corresponding reader so that it stops reading. This approach doesn’t feel right though, as it further complicates an already not-so-simple design.

My guess is that select_all could be of help, but I’ve had no luck trying to make it work here.

Could anyone experienced in futures and asyncronous programming show how to re-write this code so that server could drop clients at will? E.g. when client sends “bye”, server closes TCP connection.

For completeness, my Cargo.toml:

name = "async-test"
version = "0.1.0"
authors = ["xxx"]
edition = "2018"

[dependencies]
runtime = "0.3.0-alpha.6"
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] }```

If anyone is interested, I solved it: https://stackoverflow.com/questions/56930108/select-from-a-list-of-sockets-using-futures/56936467#56936467

1 Like