Try_lock on futures::lock⁠::Mutex outside of async?

I'm trying to implement Async read for a struct that has a futures::lock::Mutex:

pub struct SmolSocket<'a> {
    stack: Arc<futures::lock::Mutex<SmolStackWithDevice<'a>>>,
}

impl<'a> AsyncRead for SmolSocket<'a>  {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>
    ) -> Poll<std::io::Result<()>> {
        block_on(self.stack).read(...)
    }
}

The problem is that, since poll_read is not async, I cannot call await. But I also don't want to, as it'd block. I could call try_lock to try and if not, I'd register a Waker to be called by SmolSocket in the future.

Since I cannot do that either because it's not async, is there a version of block_on that does the same as try_lock for futures::lock::Mutex outside of async?

If you look at Future::poll, you'll see that the signature is very similar to AsyncRead::poll_read. This is because you cannot have async functions in traits (yet?), so AsyncRead mimics Futures as a workaround.

In fact, awaiting a future polls it, and returns Poll::Pending if the polled future does so. To implement AsyncRead, you'll have to do it too.

so poll_read is async? How does Rust knows it and let me use self.stack.lock()?

Why I need to poll the guard here but not on a async function?

No, poll_read is not async.
Actually, from the outside an async function is a normal function, the special thing is that its return type implements Future. So you can call one from anywhere, but it won't actually do anything unless you poll its return value.

In an async block or function, polling is automatically done by the compiler when you .await it. Here poll_read is not async, so you have to do it manually.

1 Like

Ok, I thought about it a bit more, and I realized that the code I posted above was plain wrong (I removed it), because it created a new future for MutexGuard each time poll_read was called (well, it might work, but clearly not in an efficient and reliable way).
Instead, you have to store some additional state to SmolSocket, and likely have a self-referential struct, and there things get tricky.

Something is weird in SmolSocket, though: you combine an Arc (owned reference-counted data) with a SmolStackWithDevice<'a> (borrowed data). The two models are quite orthogonal, so I am not sure about what you want there. Can you clarify this point ?

Anyway, here is an idea for an implementation: Rust Playground
You'll note that it uses unsafe code, due to the self-referential struct.

If you really were outside of async, then you could have used block_on, but you are not truly outside the async context. What you are trying to do would block the thread. To make this work, you would have to store the futures::lock::MutexLockFuture inside the struct and poll that future until you get a lock. Unfortunately this would make your struct self-referential, as the future would hold a reference to the mutex, and such structs are not possible in safe Rust.

What you are trying to do here is pretty difficult.

1 Like

this is a good exercise I guess.

Suppose we have an IP stack Arc<futures::Mutex<Stack>>. Suppose that I want to implement AsyncRead and AsyncWrite for it. There might be a way to do it, as this is a problem that should arise frequently. I see no bad assumption about my project design. I don't see another way to share an IP stack with lots of threads and implement AsyncWrite/AsyncRead to it.

I've been trying to think about other designs in my project but I don't see an option

shouldn't self.stack.lock() be pollable too? Wouldn't this mean that It's just a matter of calling self.stack.lock() and then poll on it. If it returns pending, return it, otherwise return the result. The fact that we called poll on it means that it know will call wake when it's ready

The issue is that if you drop the future object returned by lock(), then it is no longer guaranteed that you will receive a wakeup. Therefore you must store the future around from poll to poll.

Unfortunately this would make your struct self-referential, as the future would hold a reference to the mutex

Just wondering, wouldn't that be fine since we are pinned anyway? My first attempt here would be to have Option<futures::lock:MutexLockFuture> on the struct and store the future in there to poll it to completion. Then replace it by None again.

Sure, it's certainly not impossible exactly because it is pinned. The problem is that taking advantage of it being pinned requires unsafe code.

impl AsyncWrite for SmolSocket {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, std::io::Error>> {
        let mut stack = match self.stack.lock().boxed().as_mut().poll(cx) {
            Poll::Pending => return Poll::Pending,
            Poll::Ready(stack) => stack,
        };
        stack.tcp_socket_send(self.socket_handle.clone(), buf);
        Poll::Ready(Ok(buf.len()))
    }
}

what about this? I don't see where it's dropped

The return value of lock() is immediately dropped when you return from poll in that example.

self.stack.lock() returns a future. This future is polled. If it's pending, then I return. Then the future is dropped, because I returned, but does it need to be constantly polled? In the next try a new future is going to be returned.

If you create a new future every time, it won't work.

1 Like

By making the AsyncRead implementer borrow from the SmolSocket (and with Unpin SmolStackWithDevice) I managed to write it without unsafe:

struct SmolSocket<'a> {
    stack: Arc<futures::lock::Mutex<SmolStackWithDevice<'a>>>,
}
impl<'a> SmolSocket<'a> {
    fn read(&'a self) -> Reader<'a> {
        Reader::Locking(self.stack.lock())
    }
}
pub enum Reader<'a> {
    Locking(futures::lock::MutexLockFuture<'a, SmolStackWithDevice<'a>>),
    Locked(futures::lock::MutexGuard<'a, SmolStackWithDevice<'a>>),
}
impl<'a> AsyncRead for Reader<'a> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let this = self.get_mut();
        match this {
            Reader::Locking(f) => {
                *this = Reader::Locked(ready!(Pin::new(f).poll(cx)));
                println!("acquired lock");
                Pin::new(this).poll_read(cx, buf)
            }
            Reader::Locked(l) => Pin::new(&mut **l).poll_read(cx, buf),
        }
    }
}

Playground

2 Likes
impl<'a> AsyncRead for SmolSocket<'a> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let mut lock_fut = self.stack.lock();
        let pinned_lock_fut = Pin::new(&mut lock_fut);
        let mut guard = ready!(pinned_lock_fut.poll(cx));
        println!("acquired lock");
        let pinned_inner = Pin::new(&mut *guard);
        pinned_inner.poll_read(cx, buf)
    }
}

this looks promising but I still see no major differences from the post I thought worked. I don't understand how it didn't work.

About yours, it looks like that Reader starts in Locking mode. Everytime poll_read is called on it, it tries to lock. If it work, then it changes itself to locked and also does a poll.

Ok, but now after poll_read returns, Reader is in the Locked status. Now every new call to poll_read will never touch the Locking state again, only Locked one. Why?

Thank you for the answer by the way!

Your approach drops the lock_fut when you return from poll_read. Most futures will not emit notifications if you drop the future, so yours would only work if the mutex is immediately available.

Yours (after pinning was added) worked when there is no competition for the lock, as on a typical poll_read you would:

  1. successfully lock the lock (without receiving Poll::Pending even once)
  2. successfully read.

However, if there was competition, as soon as you don't instantly get a lock, you would bail, and drop the MutexLockFuture, (representing your place in line to lock the Mutex) without making use of its waking capability.

let mut stack = match self.stack.lock().boxed().as_mut().poll(cx) {
    Poll::Pending => return Poll::Pending,
    Poll::Ready(stack) => stack,
};
// the lockfuture has been `Drop`ped at this point, which de-registers your interest in the availability of the Mutex

is basically equivalent to

let mut stack = match self.stack.try_lock() {
    None => return Poll::Pending,
    Some(stack) => stack,
};

(+ additional thrashing of registering then deregistering the waking rather than it never existing in the first place)

It's said that anyone implementing a poll of some kind is responsible for having wakers registered if returning Poll::Pending, lest they block forever.

Further to this, if this deadlock could be avoided somehow, and that's what the many lines of "acquired lock" show, in the cases of:

  • underlying read call filled your buffer
  • you successfully read some amount, but not everything you wanted
  • the underlying AsyncRead was not ready (returned Pending)

you will undesirably drop the successfully acquired lock guard, which means:

  1. you will have to acquire it again next time you poll_read
  2. someone else might grab it in the meantime, and read some data from the middle of what you were expecting, so you end up with garbage.

However, with a reader adapter that locks once and stays locked until it's dropped, you might be able to produce reasonable behavior.

I think I understood. If I return Poll::Pending it's my responsibility to store a Waker and call this waker when some data is available. For example, I store a waker and when a socket delivers data on the other side, it calls this waker. However, in the Mutex case, there's no other side. There's nothing looking to see if the Mutex got unlocked, so if I ever return a Poll::Pending I risk the thing not being polled again ever.

However, let's analyze this:

match this {
            Reader::Locking(f) => {
                *this = Reader::Locked(ready!(Pin::new(f).poll(cx)));
                println!("acquired lock");
                Pin::new(this).poll_read(cx, buf)
            }
            Reader::Locked(l) => Pin::new(&mut **l).poll_read(cx, buf),
        }

On *this = Reader::Locked(ready!(Pin::new(f).poll(cx)));, in case poll(cx) returns, there's no problem, since the MutexLockFuture is already stored on self from the beggining. However, if it does not return, then we update this with a Reader::Locked(MutexGuard<...>). Subsequent poll_read calls will always poll the MutexGuard.

Now there's just one problem: I don't see any logic that transforms Reader::Locked back into Reader::Locking. So, how does the mutex unlock so others can use it? Also, could you explain &mut **l?