Async mutexes and poll_fn - Other lock future never wakes up

I have a case that I need to lock an async mutex within poll_fn, however after the first mutex unlocks, the other lock future never wakes up.

I'm not sure if I'm doing something wrong or if this is a bug in tokio. Any ideas?

Minimal example:

use futures_util::future::poll_fn;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task;
use tokio::time::delay_for;

async fn lock_and_delay(mutex: Arc<Mutex<()>>, id: u32) {
    {
        let _guard = poll_fn(|cx| {
            println!("poll_fn called");
            let lock_fut = mutex.lock();
            pin_utils::pin_mut!(lock_fut);
            lock_fut.poll(cx)
        })
        .await;

        println!("locked: {}", id);
        delay_for(Duration::from_secs(1)).await;
    }

    println!("unlocked: {}", id);
}

#[tokio::main]
async fn main() {
    let mutex = Arc::new(Mutex::new(()));

    let t1 = task::spawn(lock_and_delay(Arc::clone(&mutex), 1));
    let t2 = task::spawn(lock_and_delay(Arc::clone(&mutex), 2));

    t1.await.unwrap();
    t2.await.unwrap();
}

The program gets stuck and the output is:

poll_fn called
locked: 1
poll_fn called
unlocked: 1

I'm willing to bet there's a bug involved in starting mutex.lock() futures only to drop them before they resolve?

I don't think this is the intended usage of tokio::sync::Mutex::lock(), but it shouldn't completely fail like this either? Like, the typical usage is to just grab one lock future, and poll it to completion, rather than creating a new lock future every time you poll. I bet some structure internal to Mutex is giving control to the old/dropped/failed lock future.

Certainly an interesting bug. I would highly recommend reporting to tokio!


If your goal here is to get working code though, I also recommend reusing the future produced by mutex.lock() multiple times rather than recreating it within poll_fn. It no longer locks if poll_fn is replaced with this:

let mut lock_fut = Box::pin(mutex.lock());
let _guard = poll_fn(move |cx| {
    println!("poll_fn called");
    lock_fut.as_mut().poll(cx)
})
.await;
2 Likes

Basically the issue is that you're recreating the future that waits for the lock to become available on every poll, and if it doesn't succeed, you throw it away. Since you threw the future away, you should not expect to be woken up by that future.

2 Likes

Indeed the issue was that future was never woken up because it was dropped.
Thanks a lot!

The fixed version:

use futures_util::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task;
use tokio::time::delay_for;

async fn lock_and_delay(mutex: Arc<Mutex<()>>, id: u32) {
    {
        let mut lock_fut = mutex.lock();

        let _guard = poll_fn(|cx| {
            println!("poll_fn called");
            let lock_fut = unsafe { Pin::new_unchecked(&mut lock_fut) };
            lock_fut.poll(cx)
        })
        .await;

        println!("locked: {}", id);
        delay_for(Duration::from_secs(1)).await;
    }

    println!("unlocked: {}", id);
}

#[tokio::main]
async fn main() {
    let mutex = Arc::new(Mutex::new(()));

    let t1 = task::spawn(lock_and_delay(Arc::clone(&mutex), 1));
    let t2 = task::spawn(lock_and_delay(Arc::clone(&mutex), 2));

    t1.await.unwrap();
    t2.await.unwrap();
}