Defining a Global Mutable Structure to be Used Across Several Threads

Hello Rustaceans!

This is my first adventure into Rust and I'm excited to be apart of this growing community and I'm even more excited about everything Rust has to offer. I come from a C background so please bear with me, my functional and objected-oriented language experience is sparse to say the least.

For my first project I am attempting to write a reliable UDP server, which I eventually plan to use in other projects. It's really a learning exercise for me. I have the core of the UDP server & client set up: The client can send messages and the server will send a cloned reply. RIght now the server is pretty basic and all of my effort has been getting the client set up (and yes they do share a lot of the same functionality).

#Repo
Link to my GitHub Repo for any curious cats. Latest commit doesn't compile, you'll have to use the second most recent commit (f8176f0).

#Background
I am using mio and mioco to set do some of the heavy lifting. Mio handles all of the UDP socket, while I use mioco to pass messages between threads to perform actions on. One thread listens on a socket while the other is prepared to send data on that same socket. A third thread listens for user input from the command line, meant only for the debugging the client and firing off actions.

For the reliability portion (See netbuffers.rs), I have a structure defined which keeps track of 32 transactions, specifically:

  1. What packets have been sent for this frame set (sent_packets and tx_packets)?
  2. What packets have been received by the server (rx_acks)?
  3. What packets need to be resent (high_priority_acks)?

We'll call this structure NetworkBuffer (well because that's its name).

So in C, I would declare a static global instance of NetworkBuffer and then call helper functions to access it. Something like...

static NetworkBuffer udp_network_buffer;

void remove_packet(some_index) {
    udp_network_buffer.sent_packets[some_index] = 0;
}

void insert_packet(Packet) {
    udp_network_buffer.sent_packets[some_index] = Packet;
}

int main(...) {
...
}

All I need to do is to include the header file which defined udp_network_buffer, and anybody could modify it via the helper functions and semaphores (and yes, I know this is not safe, especially in a multi-threaded environment). Okay great.

What do I want?

This is what I want...but more clearly: I want a way for NetworkBuffer to be created when the program is run and will remain alive until the program is terminated (something something lifetimes). That is truly its lifetime. And then, when the listening thread receives a packet, it can update this struct "I got an ACK" or whatever it may need. Same goes for the tx thread, it could modify the buffer when packets are sent.

#What I have tried

But Rust is not C. I've been trying to get this to work but I haven't had any luck. Here's what I've tried:

  1. I've tried to specify the lifetime of a statically declared instance of my structure but I couldn't get it to work. I haven't used lifetimes yet in practice but I do know their purpose in theory. Lifetimes are used to tell the compiler how long variables should live and that relation with the functions that use them. Well something like that.
  2. I tried declaring an object at the root function main() and passing it to each thread handler but the problem is the ownership moves after the first thread. Because it does not implement copy. But I can't implement copy because it seems overly complicated. Deriving or implementing copy for NetworkBuffer requires that Copy is recursively defined for all of its members... Some of them being an Vec<T>.

How would this problem be solved here? Ultimately I need a way to get this reliability scheme implemented so that the sending and receiving threads can access it.

I hope I have explained it well enough. Thanks.

Well, with your second try, implement Clone for it instead (similar to Copy, but explicit via .clone()) or pass it's reference, no move. You can also use unsafe to use a static variable like you'd do in C. You can mutate it inside unsafe {}. I can't say anything about this last one in a multithreaded environment, though. Unsafe should be the last thing to consider, when everything else fails. Good luck

I think you are looking for a combination of lazy-static and std::sync::Mutex:

// Put this in your Cargo.toml:
//
// ```
// [dependencies]
// lazy_static = "0.2"
// ```
#[macro_use] extern crate lazy_static;

use std::sync::Mutex;
use std::thread;

struct Packet;

struct Buffer {
    xs: Vec<Packet>
}

impl Buffer {
    fn new() -> Buffer { Buffer { xs: Vec::new() } }

    fn insert(&mut self, p: Packet) {
        self.xs.push(p)
    }
}

lazy_static! {
    static ref GLOBAL_BUFFER: Mutex<Buffer> = Mutex::new(Buffer::new());
}

fn main() {
    thread::spawn(|| {
        let mut guard = GLOBAL_BUFFER.lock().unwrap();
        guard.insert(Packet);
    });
    std::thread::spawn(|| {
        let mut guard = GLOBAL_BUFFER.lock().unwrap();
        guard.insert(Packet);
    });
}
5 Likes

To expand on this a bit, I think there are two problems here: how to work with global variables, and how to share a value between different threads.

As for global variables, they are pretty similar to the ones in C: you just say static x: i32 = 92. But unlike C, Rust requires that all you variables are initialized before the first access. For global variables, that means that they must be initialized at the point of declaration. Technically, this I believe also happens in C with zero initialization, however in Rust not every type can be safely zero initialized (for one, references must be non-zero), so you must provide a constant expression for initialization yourself. The expression must be evaluatable at compile time, because, unlike C++, there is no life before main in Rust.

This brings up a question: what if you need to have a global for a type which can not be initialized with a const expr? The typical solution is to use a lazy-static macro, which initializes a global on the first access.

How do one shares a value between threads? The first thing is to ensure that the shared value is actually lives as long as any thread. This is trivially true for global variables:

use std::thread;

static X: i32 = 92;

fn main() {
    // works fine, because X is always there
    thread::spawn(|| println!("{}", X));
}

It is not so for locals:

use std::thread;

fn main() {
    let x = 92;
    thread::spawn(|| {
        // If the thread outlives the outer scope
        // then the `x` will be destroyed, so Rust
        // rightfully protects us from leg shooting.
        println!("{}", x)
    });
}

To handle this, you either need to make sure that the thread exits before the enclosing function, or to make sure that the data is live as long as all the threads live. The first be handled with the crossbeam, the second with Arc:

extern crate crossbeam;

fn main() {
    let x = 92;
    crossbeam::scope(|scope| {
        scope.spawn(|| println!("{}", x));
    });
    // corssbeam guarantees that the spawned thread will end at this point,
    // so that's why we are allowed to access locals of the enclosing function.
    println!("{}", x);
}
use std::sync::Arc;
use std::thread;

fn main() {
    let x = Arc::new(92);
    let xx = x.clone();
    thread::spawn(move || println!("{}", *xx));
    // the spawned thread may or may not be alive at this point, but the `x`
    // is certainly alive, because it has a reference count of at least one.
    println!("{}", *x);
}

Ok, suppose your succeed with sharing a value between threads! Then you inevitably notice that you share only & (constant) references. What if you need a mutable access to the data? Can you share a &mut? Turns out that you can't, because really the distinction between & and & mut is not const vs mutable, but aliased (shared) vs unique. That is &mut is a unique reference and so by definition it can not be shared.

However, there is a trick! A mutex guarantees that if you enter a critical section, you are the only one there. In other words, several parties can share a mutex, but as long as anybody succeeded in locking it, they get unique access to the critical section. That is, mutex transforms & into &mut via runtime checks. This should explain how the following work:

use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let x = Arc::new(Mutex::new(92));
    let xx = x.clone();
    thread::spawn(move || {
        let mut guard = xx.lock().unwrap();
        *guard += 1
    });
    println!("{}", *x.lock().unwrap());
}

That's it! :slight_smile:

10 Likes

Glad you wrote this. Just yesterday I was grumbling about needing to use an Arc to pass some large immutable data structures from my main function to a few threads. Scoped threads are a great alternative. Thanks!

@matklad Thank you so much for your very detailed response. You gave more than 100% on this post and I cannot express how grateful I am.

I will try this out very soon, unfortunately I do not have time now to do that. Thank you again.

EDIT:
I could not wait so I tried this out. From my initial testing it appears to be working :slight_smile:
One change I had to make was let my static definition be public so that other modules could reference it. Using matklad's example:

lazy_static! {
    pub static ref GLOBAL_BUFFER: Mutex<Buffer> = Mutex::new(Buffer::new());
}

pub fn getBuffer() -> &'static GLOBAL_BUFFER{
    &GLOBAL_BUFFER
}

Also instead of unwrapping the acquired buffer, I used a match on Ok and Err. But I understand he quickly put something together.

Thank you all again.