FuturesUnordered: help me find the deadlock?

NOTE: Timer is always set to 1 second. smol::Timer::after(Duration::from_secs(1))

use futures::pin_mut;
use futures::AsyncWrite;
use futures::AsyncWriteExt;
use futures::Future;
use futures_util::future::FutureExt;
use pin_project::pin_project;
use smol::{net::{TcpStream, TcpListener}, lock::{Mutex, futures::LockArc, MutexGuard, MutexGuardArc}};

#[pin_project]
pub struct SharedWriter<W> {
    #[pin] writer: Arc<Mutex<W>>,
    writer_lock: Option<MutexGuardArc<W>>,
    f: smol::Timer,
}

impl<W: AsyncWrite + Unpin> Future for SharedWriter<W> {
    type Output = std::io::Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();

        futures::ready!(this.f.poll_unpin(cx));

        Poll::Ready(Ok(()))
}
}

then I use the above code like this

        let mut f = futures::stream::FuturesUnordered::new();
        f.push(shared_writer1);
        f.push(shared_writer2);
        f.push(shared_writer3);

        while let Some(r) = f.next().await {
            println!("{:?}", r.unwrap());
        }

There are no problems and FuturesUnordered runs each shared_writer one after the other. If I change SharedWriter to just acquire the lock, without a timer involved, it also works just fine. FuturesUnordered executes each shared_writer one by one.

impl<W: AsyncWrite + Unpin> Future for SharedWriter<W> {
    type Output = std::io::Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();
        let guard = if this.writer_lock.is_none() {
            let lock = this.writer.lock_arc();
            pin_mut!(lock);
            *this.writer_lock = Some(futures::ready!(lock.poll(cx)));
            this.writer_lock.as_mut().unwrap()
        } else {
            this.writer_lock.as_mut().unwrap()
        };

        *this.writer_lock = None; // drop lock
        Poll::Ready(Ok(()))
}
}

But when I add both a timer and acquire lock, FuturesUnordered stops after executing the first shared_writer and there is a deadlock.

impl<W: AsyncWrite + Unpin> Future for SharedWriter<W> {
    type Output = std::io::Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();
        let guard = if this.writer_lock.is_none() {
            let lock = this.writer.lock_arc();
            pin_mut!(lock);
            *this.writer_lock = Some(futures::ready!(lock.poll(cx)));
            this.writer_lock.as_mut().unwrap()
        } else {
            this.writer_lock.as_mut().unwrap()
        };

        futures::ready!(this.f.poll_unpin(cx)); // causes problem
        *this.writer_lock = None; // drop lock
        Poll::Ready(Ok(()))
}
}

I don't see how this is a problem. Acquire lock, poll the timer, Poll::Pending or Poll::Ready from timer, it doesn't matter. Eventually timer will return Poll::Ready and the lock will be released. Someone will make progress.

I involve FuturesUnordered because if I just execute each shared_writer future one by one without FuturesUnordered, it works just fine either impl and doesn't deadlock.

shared_writer1.await.unwrap();
shared_writer2.await.unwrap();
shared_writer3.await.unwrap();

No problems either implementation with the above calling code.

This code creates a LockArc future, polls it once, and if it does not complete, throws out the future and creates a new one the next time SharedWriter is polled. Depending on the exact policy/implementation of the mutex you're using, that might or might not be the cause of your deadlock (because the LockArc never gets polled to completion), but regardless, it's incorrect or at least inefficient.

You need to store the LockArc and reuse it on the next poll(), instead of creating a new one.

4 Likes

throws out the future

It does? You can't throw out the lock_arc()? It's not a polled future yet, thus not acquiring. It's just cloning the arc mutex data.

You need to store the LockArc and reuse it on the next poll(), instead of creating a new one.

Am I not doing that already?

*this.writer_lock = Some(futures::ready!(lock.poll(cx))); This is storing the acquired lock.

Regardless there is no deadlock without the timer. I still don't see how the guard, which appears to be getting polled and acquired and stored correctly, is the problem.

futures::ready!(lock.poll(cx)) polls the LockArc. Now it has been polled. Then futures::ready! returns if that poll returned Pending, and the LockArc is in a local variable lock, so it gets dropped after it has been polled, while it hasn't yet completed.

Am I not doing that already?

You're storing the MutexGuardArc<W> guard, if you successfully obtain it. You're not storing the LockArc future by which you obtain the guard. You need to store the LockArc so you can poll it until it returns Ready, then drop it.

This is equivalent to what writer.lock_arc().await would do in an async block — the awaitee is stored in the generated future type.

In the version which does not use the timer, the lock guard is dropped in the same poll() call that acquires it, so there is never any contention for the mutex and lock_arc() can just succeed immediately, without needing to wait for another task to unlock the mutex.

(If you were polling the 3 SharedWriters from separate threads of a multi-threaded executor, this would not be a sufficient condition to avoid contention, but FuturesUnordered doesn't use any threads, just polls one of its futures at a time, so activities that happen entirely within a single poll() call can never overlap with each other.)

2 Likes

I just don't see how that's a problem. LockArc is just an Arc reference. It's not an acquired lock. The local variable getting dropped because the lock is not acquirable, has no direct consequences.

This is the code for lock_arc()

    pub fn lock_arc(self: &Arc<Self>) -> LockArc<T> {
        LockArc::_new(LockArcInnards::Unpolled {
            mutex: Some(self.clone()),
        })
    }

And in there, self.clone() is just a call to Arc::clone. Nothing locking related is happening here. Just cloning an Arc reference.

This is wrong. Please look at the code I provided

impl<W: AsyncWrite + Unpin> Future for SharedWriter<W> {
    type Output = std::io::Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();
        let guard = if this.writer_lock.is_none() {
            let lock = this.writer.lock_arc();
            pin_mut!(lock);
            *this.writer_lock = Some(futures::ready!(lock.poll(cx)));
            this.writer_lock.as_mut().unwrap()
        } else {
            this.writer_lock.as_mut().unwrap()
        };

        *this.writer_lock = None; // drop lock
        Poll::Ready(Ok(()))
}
}

When the lock is acquired, this code succeeds
*this.writer_lock = Some(futures::ready!(lock.poll(cx)));

When that code succeeds, the guard is stored *this.writer_lock =
*this.writer_lock = Some(futures::ready!(lock.poll(cx)));

Both versions, with timer and without, use the same exact locking code.

It is a stateful future. The cloned Arc is just its initial state. The only way to successfully acquire the lock is to drive that future to completion. You're not doing that, but giving up after the first poll() instead.

Yes, that code has no direct effects, but that code is not the important part; the important part is what happens when it's polled. There's nontrivial stateful logic there. If that doesn't get polled to completion, then you don't successfully get a mutex guard out of it.

Yes, but in the timer-less version, immediately afterward, in the same SharedWriter::poll() call, the code does *this.writer_lock = None;, so the guard has been dropped immediately after it is acquired. And so there is no lock contention because no other tasks are ever trying to acquire the lock between those two events.

4 Likes

I am doing that? *this.writer_lock = Some(futures::ready!(lock.poll(cx))); this drives the future to completion. lock.poll(cx) gets called. This will make it so that when the lock is available or released my own poll impl will get called again and have an opportunity to acquire that lock.

Let's just work through a fresh solution.

#[pin_project]
struct Me {
    #[pin] mutex: Arc<Mutex<()>>,
    guard: Option<MutexGuardArc<()>>,

impl Future for Me {
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        let this = self.project();

        let arc = this.mutex.lock_arc();
        pin_mut!(arc)
        let result = arc.poll(cx);
        if let Poll::Ready(guard) = result {
            *this.guard = Some(guard);
            println!("I got the lock");
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

arc.poll(cx); tries to acquire the lock. Scenarios are

  1. The lock is ready: stores the guard, lock is locked. it prints
  2. The lock is not ready: we return Poll::Pending. The arc variable is gone, but so what? arc.poll(cx) already did its job. To register another poll call, when the lock is available. Meaning my own poll method will get called by the executor. And the code above will get called again, this time with an open lock. Which will acquire the lock and print per scenario 1.

You would have to tell me that arc.poll(cx); either doesn't register my future to be polled again when the lock is open, or somehow dropping the LockArc undoes that.

I don't understand. If someone acquires the lock and stores the guard, no one else can use the lock. The poll method can then run to completion, and the lock is still locked. Exactly as intended. If the lock is locked, then the this.writer_lock.poll(cx) in the original code doesn't succeed. As it shouldn't. You were saying that neither happen because

No, it does not. You're polling lock exactly once in that line, and then dropping it (by returning via ready!) if it doesn't return Ready. You need to store lock in your future and poll it again, not recreate it, until it returns Ready. That's what driving it to completion means.

Here, you're making the assumption that LockArc is guaranteed to eventually succeed immediately on a single poll after being created. This is not a documented guarantee of LockArc, and not something you can reasonably assume about an arbitrary future. Futures are allowed to return Pending, meaningfully or spuriously, as many times as they want before returning Ready.

Undoing the registration would be a valid implementation. In fact, not removing registrations could lead to a memory leak, if many futures are cancelled while waiting for the lock; the queue could grow unboundedly as long as the lock was held.

Yes, but in your timer-less code, nobody ever did that. The only times the mutex was ever locked, it was immediately unlocked before anyone else tried to lock it. So, no contention actually occurred.

4 Likes

This is indeed a common issue. I can point you to an example of how this is usually solved. For example, Tokio has a broadcast channel. The Receiver::recv method returns a future, and the future internally stores an entry for a linked list. The linked list contains all futures that are waiting for new messages on the broadcast channel.

What happens if you poll the Receiver::recv future and get Pending is that the future is added to the linked list of waiters that will be notified. Unfortunately, if the future is dropped, then this causes the future to be removed from the linked list. This has the effect that the relevant waker will not be called when the next message arrives.

To make the broadcast channel work with poll methods, Tokio provides a wrapper that stores the actual future returned by Receiver::recv to avoid giving up the slot in the linked list. You can read the source code of that at tokio-stream/src/wrappers/broadcast.rs.

3 Likes

Is the Receiver::recv future similar to LockArc from async_lock crate? If dropping the LockArc removes the waiter it registers, then that could explain the problem. As I stated in

Can we work through a simplified example? The other poster is claiming that dropping the LockArc after calling its poll() method, somehow prevents the lock from being successfully acquired. I don't believe this because it seems to me like the LockArc is just a reference who's job is to register a waiter on failure and returns a MutexGuard on success. It's the MutexGuard that has to be held as the lock. I can demonstrate this with simple code

        let data = Arc::new(Mutex::new(()));

        let mut lockarc1 = data.lock_arc();
        let mut lockarc2 = data.lock_arc();

        pin_mut!(lockarc1);
        pin_mut!(lockarc2);

        let guard1 = (&mut lockarc1).await;
        println!("lockarc2 acquiring");
        (&mut lockarc2).await;

let guard1 = (&mut lockarc1).await; acquires the lock, stores the MutexGuard in guard1, and print, prints, and (&mut lockarc2).await; blocks forever since the lock is locked. So far so good.

Now lets try that again but this time dropping the lockarc variable that supposedly needs to be stored between awaits.

        let data = Arc::new(Mutex::new(()));

        let mut lockarc1 = data.lock_arc();
        let mut lockarc2 = data.lock_arc();

        pin_mut!(lockarc1);
        pin_mut!(lockarc2);

        let guard1 = (&mut lockarc1).await;
        println!("lockarc2 acquiring");
        drop(lockarc1);
        (&mut lockarc2).await;

Exact same behavior. Dropping the LockArc has no consequences for acquiring and holding the lock. Yes this simulates the success case where lockarc1 succeeds in getting the lock first try. But unless LockArc drop implementation somehow deregisters its waiter, there is no observable difference. Seriously if there is an issue with dropping the LockArc on poll failure, such that the waiter is deregistered, then I would like to see the specific crate code that has led someone to that knowledge. (The reason I haven't just looked up the code is because it's obfuscated with macros and I am not knowledgeable about those in rust)

Yes.

This doesn't do the same thing. You don't get past the .await until the future returns Poll::Ready. This means that by the time you call drop(lockarc1) the future has already completed and it's no longer a problem to drop it.

Also, drop(lockarc1) does not drop the future. The type of lockarc1 is Pin<&mut LockArc> and dropping a pinned pointer is a no-op.

It's impossible to reproduce the problem with async/await syntax. You can only do it in poll methods.

2 Likes

Okay but that's what I was saying. The only conceivable way that dropping the LockArc could be a problem is that it deregisters a waiter. The only way. Dropping a reference who's job is to register data in another long-lived object, the data to notify, has no consequences on that data unless that data is deleted explicitly when drop is called. That's what I was trying to get at but kept getting conflicting and contradictory information.

Okay so I should store the LockArc, not because dropping it somehow drops the lock it never acquires, but because it deregisters a notification that it inserted into another object. Got it.

Yes. If the LockArc returns Pending then it has not acquired the lock. Instead, it has added itself to the queue of LockArc futures waiting for the lock. The relevant destructor is here:

2 Likes

The LockArc would also be allowed to return Pending on the first poll for no reason at all. It doesn't, in the current version of the library, but in order to write a correct program, you need to tolerate such changes in behavior from futures you didn't write.

This discussion has been largely about reasoning about the implementation details of a type from a library. That is an appropriate strategy for debugging — understanding why your program hung — but it is not an appropriate strategy for writing correct code. You should poll the LockArc to completion because that's how you work with an arbitrary Future.

5 Likes