What is the right way to combine several data sources in asyncronous code?

In order to study asyncronous programming with Rust, i decided to write a simple signle-threaded chat-server with tokio. Since every chat-user should be able to broadcast message to many other users, i used tokio::sync::broadcast::channel.

Since there should be ability to react both on new data from socket and on new data from broadcasting channel, i can't simple await for new data from socket, but should somehow combine awaiting of socket and broadcasting channel.

I found macro tokio::select!, designed for this purpose, and did following the example from this chapter
select in tokio - Rust ,
(third example, where two streams are combined):

    let listener = tokio::net::TcpListener::bind(bind_point).await?;
    let (queue_tx_orig, _) = tokio::sync::broadcast::channel(queue_size);
    loop {
        let (socket, peer) = listener.accept().await?;
        let queue_tx = queue_tx_orig.clone();
        tokio::spawn(async move {
            let (socket_rx, mut socket_tx) = socket.into_split();
            let mut socket_rx_stream = LinesStream::new(BufReader::new(socket_rx).lines());
            let mut queue_rx_stream = BroadcastStream::new(queue_tx.subscribe());
            loop {
                tokio::select! {
                    net_input = socket_rx_stream.next() => {
                        if let Some(line) = net_input {
                            ... // actions on new line from socket
                        } else {
                            break;
                        }
                    },
                    queue_input = queue_rx_stream.next() => {
                        if let Some(message) = queue_input {
                            ... // actions on new data from broadcast channel
                        }
                    }
                };
            }
            Ok::<(), std::io::Error>(())
        });
    }

This code worked well, but then i realized, that i don't see a big difference between
tokio_stream::StreamExt::next() and tokio::io::Lines::next_line()
or between
tokio_stream::StreamExt::next() and tokio::sync::broadcast::Receiver::recv()

So i have rewrited code, removing redundant stream wrappers from socket and broadcasting channel.

    let listener = tokio::net::TcpListener::bind(bind_point).await?;
    let (queue_tx_orig, _) = tokio::sync::broadcast::channel(queue_size);
    loop {
        let (socket, peer) = listener.accept().await?;
        let queue_tx = queue_tx_orig.clone();
        tokio::spawn(async move {
            let (socket_rx, mut socket_tx) = socket.into_split();
            let mut socket_rx_lines = BufReader::new(socket_rx).lines();
            let mut queue_rx = queue_tx.subscribe();
            loop {
                tokio::select! {
                    net_input = socket_rx_lines.next_line() => {
                        if let Some(line) = net_input? {
                            ... // actions on new line from socket
                        } else {
                            break;
                        }
                    },
                    queue_input = queue_rx.recv() => {
                        ... // actions on new data from broadcast channel
                    }
                };
            }
            Ok::<(), std::io::Error>(())
        });
    }

And this code also works well. Just in case i tested the situation, where both socket and broadcasting channel are ready before tokio::select! is executed - no data were lost. I think, this chapter matters:
select in tokio - Rust - since both tokio::sync::broadcast::Receiver::recv and tokio::io::Lines::next_line() are cancellation safe.

Is variant without streams acceptable? What are the pitfalls? What are the benefits of using streams?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.