What is the right way to combine several data sources in asyncronous code?

In order to study asyncronous programming with Rust, i decided to write a simple signle-threaded chat-server with tokio. Since every chat-user should be able to broadcast message to many other users, i used tokio::sync::broadcast::channel.

Since there should be ability to react both on new data from socket and on new data from broadcasting channel, i can't simple await for new data from socket, but should somehow combine awaiting of socket and broadcasting channel.

I found macro tokio::select!, designed for this purpose, and did following the example from this chapter
select in tokio - Rust ,
(third example, where two streams are combined):

    let listener = tokio::net::TcpListener::bind(bind_point).await?;
    let (queue_tx_orig, _) = tokio::sync::broadcast::channel(queue_size);
    loop {
        let (socket, peer) = listener.accept().await?;
        let queue_tx = queue_tx_orig.clone();
        tokio::spawn(async move {
            let (socket_rx, mut socket_tx) = socket.into_split();
            let mut socket_rx_stream = LinesStream::new(BufReader::new(socket_rx).lines());
            let mut queue_rx_stream = BroadcastStream::new(queue_tx.subscribe());
            loop {
                tokio::select! {
                    net_input = socket_rx_stream.next() => {
                        if let Some(line) = net_input {
                            ... // actions on new line from socket
                        } else {
                            break;
                        }
                    },
                    queue_input = queue_rx_stream.next() => {
                        if let Some(message) = queue_input {
                            ... // actions on new data from broadcast channel
                        }
                    }
                };
            }
            Ok::<(), std::io::Error>(())
        });
    }

This code worked well, but then i realized, that i don't see a big difference between
tokio_stream::StreamExt::next() and tokio::io::Lines::next_line()
or between
tokio_stream::StreamExt::next() and tokio::sync::broadcast::Receiver::recv()

So i have rewrited code, removing redundant stream wrappers from socket and broadcasting channel.

    let listener = tokio::net::TcpListener::bind(bind_point).await?;
    let (queue_tx_orig, _) = tokio::sync::broadcast::channel(queue_size);
    loop {
        let (socket, peer) = listener.accept().await?;
        let queue_tx = queue_tx_orig.clone();
        tokio::spawn(async move {
            let (socket_rx, mut socket_tx) = socket.into_split();
            let mut socket_rx_lines = BufReader::new(socket_rx).lines();
            let mut queue_rx = queue_tx.subscribe();
            loop {
                tokio::select! {
                    net_input = socket_rx_lines.next_line() => {
                        if let Some(line) = net_input? {
                            ... // actions on new line from socket
                        } else {
                            break;
                        }
                    },
                    queue_input = queue_rx.recv() => {
                        ... // actions on new data from broadcast channel
                    }
                };
            }
            Ok::<(), std::io::Error>(())
        });
    }

And this code also works well. Just in case i tested the situation, where both socket and broadcasting channel are ready before tokio::select! is executed - no data were lost. I think, this chapter matters:
select in tokio - Rust - since both tokio::sync::broadcast::Receiver::recv and tokio::io::Lines::next_line() are cancellation safe.

Is variant without streams acceptable? What are the pitfalls? What are the benefits of using streams?