Need some help on best practices for shared mutable state

Allright, currently I'm trying to design a server struct, which relies heavily on shared mutable state. The rough structure looks something like the following:

pub struct Server {
    // worker pool to process incoming TCP and UDP messages, etc.
    thread_pool: ThreadPool,
    // handler functions which will be registered before the server is launched
    // the thread_pool will use those to process the requests
    handler_a: fn(...) -> ...,
    handler_b: fn(...) -> ...,
    handler_c: fn(...) -> ...,
    // and here we go: a sh*tload of configuration parameters
    main_server_name: Arc<Mutex<T>>,
    main_server_port: Arc<Mutex<S>>,
    language:  Arc<Mutex<T>>,
    mod_name: Arc<Mutex<X>>,
    mod_desc: Arc<Mutex<T>>,
    test_module: Arc<Mutex<Y>>,
    work_as_client: Arc<Mutex<Z>>,
    alive_counter: Arc<Mutex<W>>,
    ...
    // many, many more
}

The problem is, I can't design the server struct to be somewhat stateless like an REST API. The server can change it's internal configuration parameters dynamically depending on the incoming network connections (the handler functions will change it for example). What I don't like about that design is that whenever I want to read or write to an configuration parameter, I have the use the MutexGuard like so:

let mut language_guard = self.language.lock().unwrap();

I may be totally wrong, but this seems to be a bit tedious to do this every time (especially with getters and setters to encapsulate the acquiring of the mutex). So my next idea was to put all the configuration parameters in a separate struct:

pub struct ServerConfig {
    param_a,
    param_b,
    ...
}

pub struct Server {
    // worker pool to process incoming TCP and UDP messages, etc.
    thread_pool: ThreadPool,
    // handler functions which will be registered before the server is launched
    // the thread_pool will use those to process the requests
    handler_a: fn(...) -> ...,
    handler_b: fn(...) -> ...,
    handler_c: fn(...) -> ...,
    config: Arc<Mutex<ServerConfig>>,
}

... 
// and to use it
let mut config_guard = self.config.lock().unwrap();
println!("Current language = {}", config_guard.language);
config_guard.work_as_client = false;
...

Which has the disadvantage that every time a need to read or update a configuration parameter, I'm locking the whole config variable, which seems to me would be bad for fast multithreading.

Which leads to my actual question: are there any best practices for Rust on how to handle my problem so this can be achieved in a "smoother" way?

Best regards
0xBIT

Moving config to another struct is a good idea. You probably don't need Arc in there — use Arc<Server> to share the whole server object, not its fields. For config use RwLock<ServerConfig> (rwlock instead of mutex, so that your server doesn't become single-threaded). If you have counters, you can also use Atomic instead of locks.

Another option is to use ArcSwap that lets you replace the whole object when it changes. Works great for config if you can tolerate old config temporarily staying in use for requests that already used it.

2 Likes

RwLock seems to be just what I was looking for! However, I struggle with implementing to pass the Arc<T> to my handler functions (ThreadControl::Job). My ThreadPool is defined as follows:

use std::thread;

/// This enum type implements the different commands a worker thread can receive from its parent thread.
/// The main thread will send an instance of this type over the `mpsc` channel to the worker thread, which
/// will then react accordingly.
enum ThreadControl<T> {
    /// A job for the worker thread to do: this will be a closure to execute
    Job(Box<dyn FnOnce(Arc<T>) + Send + 'static>),
    /// A stop signal for the worker thread to stop it's work.
    Stop,
}

/// This type represents a worker thread, which will be used to handle different tasks, commanded by the
/// main thread. A worker thread is identified by it's id, and it holds a `thread::JoinHandle`
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    /// Constructs a new `Worker`, by spawning a new thread. The main loop of this new thread, will
    /// await commands from the `job_receiver` atomic reference-counted receiver channel.
    fn new<T>(
        id: usize,
        job_receiver: Arc<Mutex<mpsc::Receiver<ThreadControl<T>>>>,
        ctx: Arc<T>,
    ) -> Self
    where
        T: Send + Sync,
    {
        let thread = thread::Builder::new()
            .name(format!("worker-{}", id))
            // this here leads to the "may-not-live-long-enough" error
            .spawn(move || loop {
                let thread_control = job_receiver.lock().unwrap().recv().unwrap();
                match thread_control {
                    ThreadControl::Stop => {
                        println!("Worker worker-{} received stop signal - terminating...", id);
                        break;
                    }
                    ThreadControl::Job(job) => {
                        println!("Worker worker-{} go a new job! executing it now...", id);
                        job(ctx);
                    }
                }
            })
            .unwrap();
        Self { id, thread }
    }
}

/// With this type it's possible to create a pool of worker threads, that can handle work in parallel.
/// The communication between the worker threads and the main thread is done via a `mpsc` channel.
/// Each thread in the pool receives a cloned atomic reference-counted receiver channel, over which the
/// main thread can control the behavior of its worker threads, by sending "commands" over the `mpsc` channel.
pub struct ThreadPool<T> {
    /// the pool of worker threads
    workers: Vec<Worker>,
    /// the sender channel, to command the threads to specific jobs
    thread_control: mpsc::Sender<ThreadControl<T>>,
    /// number of the worker threads in the thread pool.
    size: usize,
}

impl<T> ThreadPool<T> where T: Sync + Send {
    pub fn new(pool_size: usize, ctx: &Arc<T>) -> Self {
        let mut workers = Vec::with_capacity(pool_size);

        // Job
        let (thread_control, job_receiver) = mpsc::channel();
        let job_receiver = Arc::new(Mutex::new(job_receiver));

        for id in 0..pool_size {
            workers.push(Worker::new(id, Arc::clone(&job_receiver), Arc::clone(ctx)));
        }

        Self {
            workers,
            thread_control,
            size: pool_size,
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce(Arc<T>) + Send + 'static,
    {
        let closure = Box::new(f);
        self.thread_control
            .send(ThreadControl::Job(closure))
            .unwrap();
    }

    pub fn stop(&self) {
        self.thread_control.send(ThreadControl::Stop).unwrap();
    }
}

This won't compile, because Arc<T> doesn't have a 'static lifetime:

error[E0310]: the parameter type `T` may not live long enough
  --> /home/Projects/Rust/libbase/src/misc/thread_pool.rs:35:14
   |
24 |     fn new<T>(
   |            - help: consider adding an explicit lifetime bound...: `T: 'static`
...
35 |             .spawn(move || loop {
   |              ^^^^^ ...so that the type `[closure@/home/Projects/Rust/libbase/src/misc/thread_pool.rs:35:20: 47:14]` will meet its required lifetime bounds

For more information about this error, try `rustc --explain E0310`.
error: could not compile `libbase` due to previous error

But the lifetime of Arc<T>, or to be more precise, Arc<Server> will never be 'static. I understand why this doesn't compile, since one of my threads may run longer than the main thread, in which Server lives, but how can this be fixed?

Arc<T> is 'static if the T does not reference anything else. So if the main threan creates an Arc<Server> it should be able to spread that to the workers and whichever drops the arc last will drop the actual Server. So it should not be a problem. But the compiler cannot know that when it sees your struct ThreadPool<T>.

Try changing where T: Sync + Send to where T: 'static + Sync + Send.

3 Likes

Didn't know this could be fixed by adding the static lifetime, since I thought the T has then to be something like &'static str or static MAX_U32: u32 = 0xFFFFFFFF;. Perhaps I also mixed it up with the static reference lifetimes. Anyhow, that solved the problem. Thanks a lot :slight_smile:

Expresed another way, a reference may have a non-static lifetime, since you must not keep your copy of the reference after the value it points at are gone, but something that is not (and does not contain) a reference is always 'static, since you may keep the value as long as you wish.

1 Like

By default T can be anything, including a temporary reference. T: 'static bans temporary references. Lifetimes don't apply at all to types without references, so e.g. i32: 'static is always satisfied. Not because i32 is 'static, but because such bound doesn't mean anything for it.

2 Likes