Moving a Mutex between threads?

I'm trying to write a simple TCP based server using tokio where the server holds a list of Clients and the Clients have access to the Server. This is necessary so that the Server can check the Clients for something (for instance if a Client with a specific name exists) and so that Clients can call to server for these checks

After trying various approaches to make it work I eventually arrived at using Arc<Mutex<T>> for storing the instances between each other which seems to be working fine though I'm not sure if that's the right choice here and I'd like to know if there's a better suited mechanism for it instead.

Furthermore this makes it impossible to send the Client task to tokio::spawn as Mutex cannot be "safely sent between threads". As an alternative, I tried using tokio::sync::Mutex instead of std::sync::Mutex which technically should work as it provides the Send trait, however it's inherently async meaning that I'd have to mark every function that accesses it as async even when the functions aren't necessarily async, which makes it a bit cumbersome.

I also realized that if I'm going to pass the mutex to tokio::spawn, lock it, and call Client task on it, it will essentially hold the lock indefinitely so that makes me convinced my approach here is maybe inherently flawed and I should try something else.

Here's a relevant minimal example of what I'm trying to achieve (the troubling piece of code is commented out):

use std::sync::{Arc, Mutex};
use std::vec::Vec;

use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::TcpListener;
use tokio::net::TcpStream;

struct Client {
    pub name: String,
    server: Arc<Mutex<Server>>,
}

impl Client {
    pub fn new(name: String, server: Arc<Mutex<Server>>) -> Client {
        Client {
            name: name,
            server: server,
        }
    }

    pub async fn task(&mut self, mut stream: TcpStream) {
        let (reader, _writer) = stream.split();
        let mut line = String::new();
        let mut buf_reader = BufReader::new(reader);

        loop {
            let size = buf_reader.read_line(&mut line).await.unwrap();
            if size == 0 {
                break;
            } else {
                if self.name.len() == 0 {
                    let server = self.server.lock().unwrap();
                    if server.is_name_used(line.clone()) {
                        println!("Name already in use.");
                    } else {
                        self.name = line.clone();
                        println!("Set name to: {}", self.name);
                    }
                } else {
                    println!("Message: {}", line);
                }
            }

            line.clear();
        }
    }
}

struct Server {
    pub name: String,
    clients: Vec<Arc<Mutex<Client>>>,
}

impl Server {
    pub fn new() -> Server {
        Server {
            name: "Test".to_string(),
            clients: Vec::new(),
        }
    }

    pub async fn accept(self) -> Result<(), Box<dyn std::error::Error>> {
        let server = Arc::new(Mutex::new(self));
        let mut acceptor = TcpListener::bind("127.0.0.1:1234").await?;
        loop {
            let (stream, _) = acceptor.accept().await?;
            let client = Arc::new(Mutex::new(Client::new("Test".to_string(), server.clone())));

            /*let c = client.clone();
            tokio::spawn(async move {
                client.lock().unwrap().task(stream).await;
            });*/

            server.lock().unwrap().clients.push(client);
        }
    }

    pub fn is_name_used(&self, name: String) -> bool {
        self.clients
            .iter()
            .position(|c| c.lock().unwrap().name == name)
            .is_some()
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    return Server::new().accept().await;
}

Any pointers what I could use here instead to achieve my goal?

The problem is not that Mutex cannot be sent between threads (it can), but that MutexGuard cannot, and that your async block does an await while a MutexGuard is alive.

tokio::sync::Mutex does not have this problem, though you'll need to change some other code to adapt to its async API:

4 Likes

Alternately, you could use a LocalSet and tokio::task::spawn_local to ensure that the async block doesn't move between threads during its execution.

3 Likes

Hi, thanks for clarifying that it's MutexGuard and not Mutex that cannot be shared between threads.

The code indeed builds fine when using tokio::sync::Mutex however, as you mentioned, the code requires code changes to adapt its async nature which seems a little bit odd for functionality that's not necessarily async (such as iterating over a collection). I've been wondering if there's a better mechanism for sharing this data that wouldn't involve using it.

Furthermore, the is_name_used function blocks indefinitely as the lock cannot be acquired (since, as I suspected before, apparently the lock is already held) meaning that messages cannot be really processed if I want Client to reach out to Server for its function.

1 Like

I should mention that I also tried with a LocalSet and using Rc<RefCell<>> instead and it technically also builds but manifests another problem - a panic with what appears to be a double borrow (as opposed to an indefinite wait on a mutex lock). I guess the design is just inherently flawed and I should approach it from another angle but I'm completely lost as to how it could work, perhaps I should update the title as it's not really the actual issue here?
Here's the updated code: Rust Playground

The only way to correctly keep a lock through an await point is to use the Tokio async primitives such as their Mutex.

Using a local set to make the ordinary Mutex work is a recipe for deadlocks.

2 Likes

You might want to make it so that you don't need a mutex around the entire Client, but instead only have mutexes only around individual fields that get mutated. Then functions like Client::task could take &self instead of &mut self, and you wouldn't need an exclusive lock to call them.

The same might apply to the Server.

1 Like

I was thinking about another approach where I move the task logic to the generator so that I only have to borrow and/or lock when there's new data availble but that made me realize the design this way is also inherently flawed because anytime I borrow and/or lock when a data is available, and then call Server::is_name_used while the lock is hold and another is trying to be acquired, I will always end up with a deadlock/double borrow.

What I now tried instead is using Mutex for individual fields as suggested, I ended up with an issue where I couldn't assign to these fields as await would return MutexGuard and the LHS in assignment would become invalid.

The more I think about it the more I'm convinced that you can only have one or the other but not both (shared mutual state and tokio tasks) and I'm just too fixated on this approach where another (maybe simpler and cleaner) exists that actually works?

You can dereference a MutexGuard using * to assign a new value to the Mutex. For example:

*mutex.lock().await = Vec::new();

(This is true for both std::sync::MutexGuard and tokio::sync::MutexGuard.)

Update: There's an open issue about improving the error messages in this situation.

1 Like

Even in that case it seems like I can either end up with a deadlock or an issue with borrowing an Arc as mutable (Rust Playground). I honestly feel at this point like the whole architecture to this is flawed and I should rethink how I'm approaching it even though in theory it's sound.

If you change task to take &self then your code compiles, though I haven't checked to see if it deadlocks:

1 Like

You should try very hard to not hold locks across an await. Even if you get it to work, it's terrible for performance. For as long as it takes the await'd event to happen, your code is sitting there holding the lock while doing absolutely nothing with the data it protects.

3 Likes

This version where neither Server nor Client is inside a Mutex (though both of them contain mutexes) gets rid of those long-held locks:

3 Likes

That does indeed work as expected now. I tested it with a simple Python script to hammer the server with as many connections as possible sending a bunch of messages until I've exhausted the number of open sockets so I guess we finally have something that works with the initial architecture and doesn't grind to a halt performance wise.

Thanks to everyone who chimed in to help with this.

This however creates another issue since now the task is not mutable anymore, that means neither can be its functions, which is an issue when trying to send data back because:

  • The writer cannot move because there's no Copy for tokio::io::WriteHalf.
  • The writer cannot be borrowed mutably because self is not mutable.
  • The task cannot be made &mut self because we will be back to the original issue.

This is how I tried to send data using the writer:

I've been fighting the same. The best I came up with is to put the recv future in a loop with a tokio::task::timeout (My incredibly messy code is here rglslsp-codec/src/app.rs · impl-replies · Eugene Marcotte / rglslsp · GitLab). This releases the guard so send works but feels cludgy.

Eagerly watching this thread cause I have been googling for days :blush:

Consider implementing Clone on the Client type so that the lock only needs to be held for the duration of cloning the Client; then pass the cloned Client into the spawn/async block.

let mut c = {
    // Lock is only held for the duration of this block
    let guard = client.lock().unwrap();

    guard.clone()
};

tokio::spawn(async move {
    c.task(stream).await;
});

Unfortunately I don't think that implementing the Clone trait for the Client struct will work. As we can see later the requirement changes that I also need to store the WriteHalf<TcpStream> in the client, which in itself does not implement the Clone trait so the Client cannot either.

Furthermore if we create a duplicate of the object, I will no longer be able to take the address of it to find it in the server map (consider a scenario where the Client calls to a function from the Server and passes self so that the Server can find it in the clients vector), for instance:
Client:

pub fn foo(&self) {
    self.server.map_nick(nick.to_string(), &self);
}

Server:

pub fn map_nick(&self, name: String, client: &client) {
    let index = self
        .clients
        .iter()
        .position(|c| Arc::into_raw(c.clone()) == &*client);
    // snip
}

The writer is in a mutex, so by locking the mutex you can get an &mut reference to it (Playground):

    pub async fn send(&self, message: String) {
        if let Some(writer) = &mut *self.writer.lock().await {
            writer.write_all(message.as_bytes()).await;
        }
    }

If you clone an Arc<Client> then you will get a new Arc pointing to the same Client, so it will still contain the same raw pointer.

2 Likes

Excellent, getting &mut to the writer worked, I think that covers everything for now.