MutexGuard "cannot be sent" inside Future generator

Hi, I am trying to use a Mutex inside a Futures (0.3) generator, but I get a compilation error.
Simplified example:

#![feature(futures_api, pin, async_await, await_macro, arbitrary_self_types)]
#![feature(generators)]

use futures::executor::ThreadPool;
use futures::task::SpawnExt;
use futures::channel::oneshot;
use std::sync::Mutex;

async fn my_task() {
    let m = Mutex::new(3u32);
    let m_guard = m.lock();
    let (sender, receiver) = oneshot::channel::<()>();
    sender.send(());
    await!(receiver);
}

fn main() {
    let mut thread_pool = ThreadPool::new().unwrap();
    thread_pool.spawn(my_task());
}

Compilation error:

$ cargo run
   Compiling check_futures_mutex_guard v0.1.0 (/home/real/temp/check_futures_mutex_guard)                                                                                                                          
error[E0277]: `std::sync::MutexGuard<'_, u32>` cannot be sent between threads safely                                                                                                                               
  --> src/main.rs:19:17                                                                                                                                                                                            
   |                                                                                                                                                                                                               
19 |     thread_pool.spawn(my_task());                                                                                                                                                                             
   |                 ^^^^^ `std::sync::MutexGuard<'_, u32>` cannot be sent between threads safely                                                                                                                  
   |                                                                                                                                                                                                               
   = help: within `impl core::future::future::Future`, the trait `for<'r> std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, u32>`                                                               
   = note: required because it appears within the type `std::result::Result<std::sync::MutexGuard<'_, u32>, std::sync::PoisonError<std::sync::MutexGuard<'_, u32>>>`                                               
   = note: required because it appears within the type `for<'r, 's> {std::sync::Mutex<u32>, std::result::Result<std::sync::MutexGuard<'r, u32>, std::sync::PoisonError<std::sync::MutexGuard<'s, u32>>>, futures_channel::oneshot::Sender<()>, futures_channel::oneshot::Receiver<()>, ()}`
   = note: required because it appears within the type `[static generator@src/main.rs:9:20: 15:2 for<'r, 's> {std::sync::Mutex<u32>, std::result::Result<std::sync::MutexGuard<'r, u32>, std::sync::PoisonError<std::sync::MutexGuard<'s, u32>>>, futures_channel::oneshot::Sender<()>, futures_channel::oneshot::Receiver<()>, ()}]`
   = note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:9:20: 15:2 for<'r, 's> {std::sync::Mutex<u32>, std::result::Result<std::sync::MutexGuard<'r, u32>, std::sync::PoisonError<std::sync::MutexGuard<'s, u32>>>, futures_channel::oneshot::Sender<()>, futures_channel::oneshot::Receiver<()>, ()}]>`
   = note: required because it appears within the type `impl core::future::future::Future`                                                                                                                         
   = note: required because it appears within the type `impl core::future::future::Future`        

Note that if I remove the lines:

let (sender, receiver) = oneshot::channel::<()>();
sender.send(());
await!(receiver);

I get no compilation error.

rustc version:

$ rustc --version
rustc 1.31.0-nightly (bef62ccdd 2018-10-16)

I understand that I can not send a MutexGuard, but I think that in my code I do not actually send it anywhere. It stays inside the generator for the whole time. Any ideas how to solve this are appreciated!

My Cargo.toml:

[package]
name = "check_futures_mutex_guard"
version = "0.1.0"
authors = ["real"]
edition = "2018"

[dependencies]

futures-preview = { version = "0.3.0-alpha.9" }

Just a wild guess, but will dropping the m_guard before await! call help? My understanding is that it currently is dropped after await (a suspension point), and that's why it needs to be send.

1 Like

I guessed so too, but I still get the same compilation error with the following code:

async fn my_task() {
    let m = Mutex::new(3u32);
    let m_guard = m.lock();
    drop(m_guard);
    let (sender, receiver) = oneshot::channel::<()>();
    sender.send(());
    await!(receiver);
}

What if you put m_guard into a block scope? i.e.

let m = Mutex::new(3u32);
{
    let m_guard = m.lock();
}
...

@vitalyd: Amazing, this one works! I wonder why though.

Probably the generator transform is still allocating the MutexGuard stack slot in the generator state instead of keeping it on the real stack, even though drop will cause it to never contain a valid value when the generator function returns. Moving it into an inner block means the stack slot itself doesn't really exist once it hits the yield.

It might be worth writing a minimal repro and opening an issue to see if the generator transform can be made smarter for these cases.

(EDIT: Here's a playground showing the same sort of issue with non-pinned generators, presumably stemming from the same underlying issue so fixing one would fix the other)

2 Likes

@Nemo157: Thanks for your reply!
I wouldn't have guessed that the example you put on the playground is related to the "mutex can not be sent" issue. I guess I still have some stuff to learn about generators. I never used them raw this way.

I think it might be useful to the tell the full story of what I was trying to do. It's a bit of a long story, but It might be of interest for someone.

One thing that I want for my Futures code is something like an asynchronous mutex, let's call it AsyncMutex. I began working on something like that here.
I never needed something like this for my real code, because I mostly use channels, but for testing something like AsyncMutex can be really useful to mock some things.

Let me give you an example. Consider the following trait (Might not compile, this is pseudo code):

struct Connection;
trait Connector {
    fn connect(&self) -> FutureObj<Some(Connection)> {};
}

Take special note at the signature of connect. It takes &self, and not &mut self. It is important because I want to be able to call connector.connect() many times in the same scope, possibly inside a loop.

During testing I want a way to mock a Connector. I could do something like this:

struct DummyConnector {
    receiver: mpsc::Receiver<Connection>,
}

impl DummyConnector {
    pub fn new(receiver: mpsc::Receiver<Connection>) {
        DummyConnector {receiver}
    }
}

impl Connector for DummyConnector {
    fn connect(&self) -> FutureObj<Some<Connection>> {
        FutureObj::new(self.receiver.next().boxed())
    }
}

Of course this will not compile, because receiver.next() requires access to &mut receiver, which in turns requires access to &mut self.

Instead, if I had something like AsyncMutex, I could do this:

struct DummyConnector {
    amutex_receiver: AsyncMutex<mpsc::Receiver<Connection>>,
}

Then the implementation of Connector::connect will be something like this:

fn connect(&self) -> FutureObj<Option<Connection>> {
    let fut_conn = amutex_receiver.acquire_borrow(|receiver| receiver.next())
    FutureObj::new(fut_conn.boxed())
}

I never managed to make the connect() implementation above to compile, because of some lifetimes issue I can not understand. The real implementation of the connect() function is not very different from the one I just described. This is what it looks like:

fn connect(&self, _address: A) -> FutureObj<Option<ConnPair<Self::SendItem, Self::RecvItem>>> {
    let amutex_receiver = self.amutex_receiver.clone();
    let fut_conn_pair = async move {
        await!(amutex_receiver.acquire_borrow(|receiver| receiver.next()))
    };
    let future_obj = FutureObj::new(fut_conn_pair.boxed());
    future_obj
}

I had to do some async move tricks to avoid borrow checker problems.
The compile error looks like this:

163 |                 await!(amutex_receiver.acquire_borrow(|receiver| receiver.next()))                                                                                                                           
    |                                                        --------- ^^^^^^^^^^^^^^^ returning this value requires that `'1` must outlive `'2`                                                                   
    |                                                        |       |                                                                                                                                             
    |                                                        |       return type of closure is futures_util::stream::next::Next<'2, futures_channel::mpsc::Receiver<client::connector::ConnPair<SI, RI>>>          
    |                                                        has type `&'1 mut futures_channel::mpsc::Receiver<client::connector::ConnPair<SI, RI>>`              

acquire_borrow()'s signature is as follows:

pub fn acquire_borrow<'a,'b:'a, F:'a,B:'a,O:'a>(&'b self, f: F) -> impl Future<Output=O> + 'a
    where
        F: FnOnce(&mut T) -> B,
        B: Future<Output=O>,
{/*...*/}

I might have messed up something with the lifetimes, but I could never manage to make it work. Maybe someone here will have an idea.

After a while I realized that the usual Mutex has an interface that is different from the one I implemented (closure based acquire_borrow()), and I thought that maybe if I had a similar design I could somehow overcome the borrow checker's problems, however, I got into different issues: MutexGuard can not be sent.

Maybe a design of AsyncMutex with an AsyncMutexGuard is not the right thing to do, because in the Futures world things are sent all the time. An AsyncMutexGuard that can not be sent may not be very useful.

I thought about the problem with the connector for the night, and I think I might be wanting too many things at the same time. Maybe a good solution would be to require &mut self for the Connector's connect trait method, and make sure that a Connector is cloneable, somewhat like mpsc::Sender behaves.

struct Connection;
trait Connector {
    fn connect(&mut self) -> FutureObj<Some(Connection)> {};
}
1 Like

I think an AsyncMutex would be really useful, I would expect the API to be something like

impl<T> AsyncMutex<T> {
    fn lock(&self) -> impl Future<Output = LockResult<AsyncMutexGuard<'_, T>>> + '_;
}

impl<T> {Deref, DerefMut, etc.} for AsyncMutexGuard<'_, T> {
}

/// for<'a, T> AsyncMutexGuard<'a, T>: Send

The main difference between this and a normal mutex would be support for the waker system, so when you attempt to acquire the mutex the task will yield and then when you unlock a mutex it will wake tasks that are waiting to acquire it.

Then your dummy connector could be something like

struct DummyConnector {
    receiver: Arc<AsyncMutex<mpsc::Receiver<Connection>>>,
}

fn connect(&self) -> FutureObj<Option<Connection>> {
    let receiver = self.receiver.clone();
    FutureObj::new(async move {
        let rx_guard = await!(receiver.lock()).unwrap();
        await!(rx_guard.next())
    }.boxed())
}

futures-rs actually has an implementation of a more limited form of async mutex, BiLock for locking the read and write halves of a bi-directional IO stream. Taking a glance at its API now it is very similar to what I posited above :smile:, it's just limited to a max of 2 handles, rather than supporting an arbitrary number of waiting tasks.

My current attempt at the design of the AsyncMutexGuard is on this branch. It's not on master yet because it is still not compiling.

I have been thinking about this for a while. I assume MutexGuard is not Send for a good reason: we don't want it to be sent between threads. What happens if we let something like AsyncMutexGuard to be sent between threads? I still can't figure out if something bad could happen.

I should take a look at the BiLock implementation, I can probably learn something from its implementation.
I tried using UnsafeCell and then Mutex internally to protect the resource in the AsyncMutex implementation, but both (UnsafeCell and MutexGuard) have problems being sent, which in turn turns AsyncMutexGuard to be not Send.

Thanks to your advice I think that I now have a working version of the AsyncMutex.
You can see it here: GitHub - realcr/async_mutex: Rust Asynchronous mutex for Futures [DEPRECATED, DO NOT USE]

1 Like

Looks good :smile:, I'm now unsure about the impl Send for AsyncMutexGuard though, since it's a smart non-owning pointer maybe it should be

impl<'a, T: Sync> Send for AsyncMutexGuard {}

but it seems like the same argument should apply to MutexGuard which doesn't have that implementation... Maybe that's because Mutex is built on top of system primitives which can be thread locked or something?

EDIT: Or since it allows mutable access

impl<'a, T: Send> Send for AsyncMutexGuard {}

to match &mut's impl?

1 Like

I have to admit I'm not comfortable with the three lines:

unsafe impl<'a,T> Send for AsyncMutexGuard<'a,T> {}
unsafe impl<T: Send> Send for AsyncMutex<T> {}
unsafe impl<T: Send> Sync for AsyncMutex<T> {}

I'm not sure that I know what I'm doing there. The last two I copied from std's Mutex implementation (BiLock had them too). I am not sure I fully understand the implications of this. I think that I should go and find out what were the Mutex people thinking about when they wrote this.

I'm also not sure about the implications of the first line. MutexGuard is not Send, I wonder what could go wrong.

It's unfortunate that this is the workaround for the problem, which is pretty hacky. It doesn't work so well when you have nested scopes, and requires turning nicely formatted code into an ugly mess just to avoid an unsafe wrapper. We can use tokio's synchronization primitives since the lockguards implement Send+Sync, but the added overhead to the program isn't worth it compared to adding one line of unsafe code

Please don't revive really old threads