Hi there! I am a relatively new Rust user, although I got my hands on a couple of projects using the language and I am enjoying it very much so far. I am currently investigating asynchronous programming with futures-preview = "0.3.0-alpha.16"
and runtime = "0.3.0-alpha.6"
. It feels very cool, but I stumbled upon some issue that I am unable to solve.
To go a bit beyond the basic examples I wanted to create an app that:
- Accepts TCP connections on a given port;
- Broadcasts all the data received from any connection to all active connections.
This took me a while to hack together, but here is the working code:
#![feature(async_await)]
#![feature(async_closure)]
use futures::{
prelude::*,
stream::select,
io::{ReadHalf, WriteHalf},
channel::mpsc::{unbounded, UnboundedSender}
};
use runtime::net::{TcpListener, TcpStream};
use std::{
io,
net::SocketAddr,
collections::HashMap,
};
async fn read_stream(
addr: SocketAddr,
mut reader: ReadHalf<TcpStream>,
sender: UnboundedSender<(SocketAddr, io::Result<Box<[u8]>>)>
) {
loop {
let mut buffer: Vec<u8> = vec![0; 1024];
match reader.read(&mut buffer).await {
Ok(len) => {
buffer.truncate(len);
sender.unbounded_send((addr, Ok(buffer.into_boxed_slice()))).expect("Channel error");
if len == 0 {
return;
}
},
Err(err) => {
sender.unbounded_send((addr, Err(err))).expect("Channel error");
return;
}
}
}
}
enum Event {
Connection(io::Result<TcpStream>),
Message(SocketAddr, io::Result<Box<[u8]>>),
}
#[runtime::main]
async fn main() -> std::io::Result<()> {
let mut listener = TcpListener::bind("127.0.0.1:8080")?;
eprintln!("Listening on {}", listener.local_addr()?);
let mut writers: HashMap<SocketAddr, WriteHalf<TcpStream>> = HashMap::new();
let (sender, receiver) = unbounded();
let connections = listener.incoming().map(|maybe_stream| Event::Connection(maybe_stream));
let messages = receiver.map(|(addr, maybe_message)| Event::Message(addr, maybe_message));
let mut events = select(connections, messages);
loop {
match events.next().await {
Some(Event::Connection(Ok(stream))) => {
let addr = stream.peer_addr().unwrap();
eprintln!("New connection from {}", addr);
let (reader, writer) = stream.split();
writers.insert(addr, writer);
runtime::spawn(read_stream(addr, reader, sender.clone()));
},
Some(Event::Message(addr, Ok(message))) => {
if message.len() == 0 {
eprintln!("Connection closed: {}", addr);
writers.remove(&addr);
} else {
eprintln!("Received {} bytes from {}", message.len(), addr);
for (&other_addr, writer) in &mut writers {
if addr != other_addr {
writer.write_all(&message).await.ok(); // Ignore errors
}
}
}
},
Some(Event::Message(addr, Err(err))) => {
eprintln!("Error reading from {}: {}", addr, err);
writers.remove(&addr);
},
_ => panic!("Event error"),
}
}
}
It does solve the task at hand, but has a serious flaw: there is no way to drop a connection on the server side! Sure, you can close WriteHalf<TcpStream>
, but this won't prevent the client to keep feeding you data (technically, closing WriteHalf
does not close socket). It is probably solvable via adding a channel from the server loop to every reading task, so that when the connection is dropped the server loop could send a message to the corresponding reader so that it stops reading. This approach doesn't feel right though, as it further complicates an already not-so-simple design.
My guess is that select_all could be of help, but I've had no luck trying to make it work here.
Could anyone experienced in futures and asyncronous programming show how to re-write this code so that server could drop clients at will? E.g. when client sends "bye", server closes TCP connection.
For completeness, my Cargo.toml
:
name = "async-test"
version = "0.1.0"
authors = ["xxx"]
edition = "2018"
[dependencies]
runtime = "0.3.0-alpha.6"
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] }```