Potential Deadlock When Using Sync Lock in Async Code

Im using tokio, and I encountered a deadlock issue when trying to lock a Sync Mutex in async code.

I think I have figured out how the deadlock issue occurs, but Im not entirely sure if my analysis is correct. If there are any mistakes in my analysis, please point them out.

Here's a simplified version of the code I'm working with:

use parking_lot::{Mutex, RwLock};
use std::path::Path;
use std::sync::Arc;
use tokio::task::JoinSet;

struct NumberPool {
    numbers: Vec<Arc<RwLock<i32>>>,
}

impl NumberPool {
    fn new() -> Self {
        NumberPool {
            numbers: Vec::new(),
        }
    }

    async fn get_num(&mut self, i: i32) -> Arc<RwLock<i32>> {
        let num = self.numbers.iter().find(|n| *n.read() == i);
        // if number exists, return it
        if let Some(num) = num {
            return num.clone();
        }
        // if number not exists, create a new one
        // simulate a slow async operation to create a new number
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        println!("new number: {i}");
        let num = Arc::new(RwLock::new(i));
        self.numbers.push(num.clone());
        num
    }

    pub fn save(&mut self) -> anyhow::Result<()> {
        let numbers_path = Path::new("numbers.json");
        let numbers: Vec<i32> = self.numbers.iter().map(|n| *n.read()).collect();
        let numbers_json = serde_json::to_string_pretty(&numbers)?;
        std::fs::write(numbers_path, numbers_json)?;
        Ok(())
    }
}

async fn async_task(number_pool: Arc<Mutex<NumberPool>>, i: i32) {
    let num = get_number_from_pool(number_pool.clone(), i % 3).await;
    if i == 0 {
        *num.write() = 100;
        let mut num_pool = number_pool.lock();
        num_pool.save().unwrap();
    }
}

async fn get_number_from_pool(num_pool: Arc<Mutex<NumberPool>>, i: i32) -> Arc<RwLock<i32>> {
    let mut num_pool = num_pool.lock();
    num_pool.get_num(i).await
}

#[tokio::main(worker_threads = 8)]
async fn main() {
    let number_pool = Arc::new(Mutex::new(NumberPool::new()));
    let mut join_set = JoinSet::new();
    for i in 0..10 {
        join_set.spawn(async_task(number_pool.clone(), i));
    }

    join_set.join_all().await;
}

There are only 8 worker threads available, but 10 tasks are spawned. This leads to 8 tasks starting execution, while 2 tasks remain waiting for available worker thread.

One of the running tasks locks the mutex and calls get_num, then it will call tokio::time::sleep(simulating a slow async operation) and yield its worker thread, one of the 2 waiting tasks gets a chance to execute and attempts to lock the mutex.

Now, all 8 worker threads are waiting for the mutex to be released by the sleeping task.1 task sleeping, 1 task not start yet.

When the sleep ends, the sleeping task needs to continue, but it cannot because all worker threads are blocked waiting for the mutex.

Since the sleeping task cannot continue and release the lock, the 8 tasks waiting for the Mutex will be blocked forever, Deadlock!

So the simplest solution I can think of is to use tokio::sync::Mutex. Is this the best solution?

Tokio's own documentation answers this, though finding it is a bit of a challenge:

As a rule of thumb, using a synchronous mutex from within asynchronous code is fine as long as contention remains low and the lock is not held across calls to .await.

Your code, very explicitly, holds a lock across an await, in get_number_from_pool:

async fn get_number_from_pool(num_pool: Arc<Mutex<NumberPool>>, i: i32) -> Arc<RwLock<i32>> {
    let mut num_pool = num_pool.lock();
   num_pool.get_num(i).await
}

The lock, represented by the MutexGuard bound to num_pool, is created before the lone .await, but dropped after. This will interfere with tokio's ability to run any other work on that thread. You must use an async mutex (such as the ones provided by tokio) to hold the lock across the await point.

It's hard to provide further advice, but the overall algorithm could probably be improved as well. Even allowing for async locks, you've structured your code so that only one task can be inside of get_num at a time. Since that's also where the expensive part of your design will go, that effectively means you will do all of the expensive parts strictly serially.

Note that this code doesn't compile on playground, blaming the exact position @derspiny mentioned.

error: future cannot be sent between threads safely
   --> src/main.rs:60:24
    |
60  |         join_set.spawn(async_task(number_pool.clone(), i));
    |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by `async_task` is not `Send`
    |
    = help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `*mut ()`, which is required by `impl Future<Output = ()>: Send`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:52:25
    |
51  |     let mut num_pool = num_pool.lock();
    |         ------------ has type `parking_lot::lock_api::MutexGuard<'_, parking_lot::RawMutex, NumberPool>` which is not `Send`
52  |     num_pool.get_num(i).await
    |                         ^^^^^ await occurs here, with `mut num_pool` maybe used later
note: required by a bound in `JoinSet::<T>::spawn`
   --> /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.1/src/task/join_set.rs:135:12
    |
132 |     pub fn spawn<F>(&mut self, task: F) -> AbortHandle
    |            ----- required by a bound in this associated function
...
135 |         F: Send + 'static,
    |            ^^^^ required by this bound in `JoinSet::<T>::spawn`

It's because the recent versions of the parking_lot made its lock guard types !Send to prevent this problem, unless you opt-out the check with this explicit feature flag.

1 Like

Yes, I enable feature "send_guard" in my project

This function definition seems highly problematic.

async fn get_num(&mut self, …

&mut self requires exclusive access to the entire NumberPool, and async requires keeping that exclusive access for the entire duration of the async operation.

This seems like a bottleneck that can completely kill parallelism.

Can you use more fine-grained locking and interior mutability? Make it async fn get_num(&self, … and then multiple get_num operations could run at the same time, interleaved, and you won't need NumberPool in a lock.

Alternatively, can you make it fn get_num(&mut self, …) -> impl Future + 'static? That would allow you to release lock on self before starting to await the future, so the locking would be synchronous and hopefully relatively quick. The catch is the future has to be independent of self's loan, so it needs async move and Arc, etc., no temporary references crossing into async move {} block.