Keep list of TcpStreams

I need to be able to store a list of TcpStreams that are accepted. I using a module level static variable, which I would prefer not to do. I don't know any other way though. The code does not compile as is shown here, the STREAMS.push is unsafe and generates an error.

use std::net::TcpListener;
use std::net::TcpStream;

static mut STREAMS: Vec<&TcpStream> = Vec::new();

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6543")?;

    loop {
        let (mut stream, _) = listener.accept()?;
        STREAMS.push(&stream);
        // Do other stuff with stream
    }

    Ok(())
}

fn another_fn() {
    // needs access to list of streams
}

Any suggestions on approaches for this. Are there standard Rust patterns? I'm new to Rust but very experienced in C... trying to learn the new patterns.

First of all, NEVER use static mut ever. It's near impossible to use correctly, hence taking &mut from it requires unsafe {} block.

You can't easily mutate the global variable, because the Rust programming language is made for multithreaded environment. Modifying global variable from multiple threads without proper synchronization is UB and you can't trigger UB using only safe Rust.

So how can I mutate shared data with concurrent access? Mutex. You can wrap your globals with std::sync::Mutex which never lets you take the inner's reference without holding the mutex guard. Currently it requires some initialization code and the Rust doesn't support to run code before main() by default, you should use another crate which supports lazy initialization like lazy_static or once_cell. Final code would be like below:

use std::sync::Mutex;
use std::net::{TcpListener, TcpStream};

use lazy_static::lazy_static;

lazy_static! {
    static ref STREAMS: Mutex<Vec<TcpStream>> = Mutex::new(Vec::new());
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("...")?;

    for stream in listener.incoming() {
        let mut streams = STREAMS.lock().unwrap();
        streams.push(stream);
        // Do other stuffs with streams
    }

    Ok(())
}
1 Like

Another approach is to pass the streams to another_fn as an argument. It may require passing them through a few layers of functions, but that avoids global state, and it's usually worth if for clarity, predictability and testability of the code.


BTW, keep in mind that &TcpStream is a temporary borrow that is not stored. Vec<&TcpStream> means a vec that doesn't hold any streams, but only has a temporary view of streams that are supposed to be already stored somewhere else. You want Vec<TcpStream> to hold the streams.

Thank you both for your replies. They both help a bit.

Layering on to the problem is that I'm using async io (tokio). Wouldn't that mean that once the mutex lock is applied to the streams it would be locked for a potentially very long time, while waiting for the next connection?

Just for context, the program I'm writing is to a serial port to tcp bridge, where the serial port is broadcast to all the tcp clients and any tcp client can send data to the serial port. Async seemed to be the best way to accomplish that... but I'm new to Rust (if that is not obvious). I'm not new to async, network programming, etc. I've been doing that for years. So much so I wrote one of the early async library for C as part of an internal project.

The code that I have is this, which does not compile as it gives this error:

error[E0277]: `std::sync::MutexGuard<'_, std::vec::Vec<tokio::net::tcp::stream::TcpStream>>` cannot be sent between threads safely
   --> src/tcp_bridge.rs:21:9
    |
21  |         tokio::spawn(async move {
    |         ^^^^^^^^^^^^ `std::sync::MutexGuard<'_, std::vec::Vec<tokio::net::tcp::stream::TcpStream>>` cannot be sent between threads safely
    |
   ::: /Users/glenn/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.13/src/task/spawn.rs:123:21
    |
123 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio;
use std::sync::Mutex;
use lazy_static::lazy_static;

lazy_static! {
    static ref STREAMS: Mutex<Vec<TcpStream>> = Mutex::new(Vec::new());
}

pub async fn tcp_io(addr: String)
        -> Result<(), std::io::Error> {
    let mut listener = TcpListener::bind(&addr).await?;
    loop {
        let (mut stream, _) = listener.accept().await?;
        let mut streams = STREAMS.lock().unwrap();
        streams.push(stream);

        tokio::spawn(async move {
            let mut buf = [0; 1024];

            loop {
                let n = stream
                    .read(&mut buf)
                    .await
                    .expect("failed to read data from stream");

                if n == 0 {
                    println!("closing connection {:?}", stream.peer_addr());
                    return;
                }

                stream
                    .write_all(&buf[0..n])
                    .await
                    .expect("failed to write data to stream");
                println!("wrote {} bytes", n);
            }
        });
    }
}

pub async fn tcp_write() {
    // code to use open streams and write stuff to them; called from outside this module
}

You should not keep a lock from an std mutex alive over an .await. Since you wish to call an async fn on the contained value, that's not really avoidable, and the tokio Mutex is meant for this case.

That said, I would strongly recommend not sharing your tcp streams between several tasks. Consider spawning a task per stream and using message passing instead.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.