In order to study asyncronous programming with Rust, i decided to write a simple signle-threaded chat-server with tokio. Since every chat-user should be able to broadcast message to many other users, i used tokio::sync::broadcast::channel.
Since there should be ability to react both on new data from socket and on new data from broadcasting channel, i can't simple await for new data from socket, but should somehow combine awaiting of socket and broadcasting channel.
I found macro tokio::select!, designed for this purpose, and did following the example from this chapter
select in tokio - Rust ,
(third example, where two streams are combined):
let listener = tokio::net::TcpListener::bind(bind_point).await?;
let (queue_tx_orig, _) = tokio::sync::broadcast::channel(queue_size);
loop {
let (socket, peer) = listener.accept().await?;
let queue_tx = queue_tx_orig.clone();
tokio::spawn(async move {
let (socket_rx, mut socket_tx) = socket.into_split();
let mut socket_rx_stream = LinesStream::new(BufReader::new(socket_rx).lines());
let mut queue_rx_stream = BroadcastStream::new(queue_tx.subscribe());
loop {
tokio::select! {
net_input = socket_rx_stream.next() => {
if let Some(line) = net_input {
... // actions on new line from socket
} else {
break;
}
},
queue_input = queue_rx_stream.next() => {
if let Some(message) = queue_input {
... // actions on new data from broadcast channel
}
}
};
}
Ok::<(), std::io::Error>(())
});
}
This code worked well, but then i realized, that i don't see a big difference between
tokio_stream::StreamExt::next() and tokio::io::Lines::next_line()
or between
tokio_stream::StreamExt::next() and tokio::sync::broadcast::Receiver::recv()
So i have rewrited code, removing redundant stream wrappers from socket and broadcasting channel.
let listener = tokio::net::TcpListener::bind(bind_point).await?;
let (queue_tx_orig, _) = tokio::sync::broadcast::channel(queue_size);
loop {
let (socket, peer) = listener.accept().await?;
let queue_tx = queue_tx_orig.clone();
tokio::spawn(async move {
let (socket_rx, mut socket_tx) = socket.into_split();
let mut socket_rx_lines = BufReader::new(socket_rx).lines();
let mut queue_rx = queue_tx.subscribe();
loop {
tokio::select! {
net_input = socket_rx_lines.next_line() => {
if let Some(line) = net_input? {
... // actions on new line from socket
} else {
break;
}
},
queue_input = queue_rx.recv() => {
... // actions on new data from broadcast channel
}
};
}
Ok::<(), std::io::Error>(())
});
}
And this code also works well. Just in case i tested the situation, where both socket and broadcasting channel are ready before tokio::select! is executed - no data were lost. I think, this chapter matters:
select in tokio - Rust - since both tokio::sync::broadcast::Receiver::recv and tokio::io::Lines::next_line() are cancellation safe.
Is variant without streams acceptable? What are the pitfalls? What are the benefits of using streams?