Is there a chance to encounter "Poll::Pending" on Mutex/BiLock when polling in sequence?

I didn't quite know what title to select for this question, so let me explain what's confusing me. Futures again, haha :). The more I work with them the more I feel like I'm coding a normal blocking/sequential code. It feels that way but at the same time, I understand that the code is run by threads in the background.

The following example shows how I designed the HTTP/2 message delivery from connection to stream. So the Store object is used to send data from the Connection to the Stream. A connection writes data to the store and then polls the stream. The poll invokes the stream which reads data from the store.

pub struct Store {
    pub data: Option<u32>,
}

/// STREAM

pub struct Stream {
    pub store: Arc<Mutex<Store>>, // Mutex or BiLock 
}
impl futures::stream::Stream for Stream {
    type Item = u32;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let this = Pin::into_inner(self);
        let data = match Pin::new(&mut this.store).poll_lock(cx) {
            Poll::Ready(mut store) => { // read data sent from connection
                if let Some(_) = &store.data.take() {
                    Some(store.data.remove(0))
                } else {
                    None
                }
            },
            Poll::Pending => { /* IS IT POSSIBLE */},
        };
        Poll::Ready(Some(u32))
    }
}

/// CONNECTION

pub struct Connection {
    pub store: Arc<Mutex<Store>>, // Mutex or BiLock 
    pub stream: Stream,
}
impl futures::stream::Stream for Connection {
    type Item = ();

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let this = Pin::into_inner(self);
        let data = match Pin::new(&mut this.store).poll_lock(cx) {
            Poll::Ready(mut store) => { // set data for stream to read
                store.data = Some(10);
            },
            Poll::Pending => { /* IS IT POSSIBLE */},
        };

        match Pin::new(&mut this.stream).poll_next(cx) {
            ...
        };

        Poll::Ready(Some(()))
    }
}

This works perfectly well but it seems that I never encounter the Poll::Pending result on the store object. I wonder if this is just a coincidence or the code is actually run in sequence thus the result will never possibly be Poll::Pending? Where do the threads come in or better how a part of the code actually becomes async in the background?

BiLock::poll_lock is documented as follows:

If the lock is already held then this function will return Poll::Pending.

Considering overall simplicity of the code while the lock is acquired, it's unlikely that the lock will be held often when a lock is polled. However, this will very rarely happen when running under multi-threaded executor (and chances are you will be using a multi-threaded executor). In your case, Poll::Pending should be handled by returning Poll::Pending (this can be done by using ready! macro from futures crate if you prefer).

Please note that that poll_next will be called again after it returns Poll::Pending, so the second Stream implementation in your post should keep track of the state it is in to avoid writing second time to store when this.stream.poll_next returns Poll::Pending.

Thanks, @xfix. My code example is intentionally very simple, but at the same time, it shows exactly how the two objects exchange messages. I understand that by manually adding threads the Poll::Pending state would be rare. Where I'm missing the knowledge is how the example will be handled by the multi-threaded executor. I assume that every poll is executed in a thread thus race conditions are inevitable.

What I try to get is a simple mathematical answer with result of 0 or 1 telling me that the Poll::Pending result is, in this specific example, not-possible or possible. I think your answer is "it is possible" but because the example is so simple the event will be rare. So basically, there is no case in the async world where Poll::Pending could be ignored. Right?

Some questions.

  • are you running this? Are you using tokio? Are you using either the full or rt-threaded feature? (If not, then your code might be running on a single threaded and pending will never occur. EDIT: This is wrong. See @Hyeonu's comment
  • In general how is this code called? Is it possible that whats driving the async code is effectively running the connection and stream sequentially? A bit more code showing how Stream/Connection are used would be helpful to see.

Single-threaded doesn't change anything. It's an async mutex, it can still returns Poll::Pending if some other task holds the mutex guard across the .await point.

2 Likes

Depends on what you mean by ignored. You're not required to return Poll::Pending when you get Poll::Pending from one of your inner futures (but usually that's what you want).

Implementing a timeout is a pretty good example of that. If your main future returns Poll::Pending, then you would check a timer future next and use its result instead.

That said, your example doesn't make a lot of sense to me. Is a single Store only shared between between Stream and Connection? If so, I don't think you actually can get Poll::Pending from .poll_lock() because you're not exiting the function (nor calling the other .poll_next()) while the lock is still held.

Thank you all for your inputs. The code will be open-sourced when it's ready so consider it as a shared effort :).

@drewkett since I'm writing a library a user will be the one who chooses the runtime. The connection loop is implemented as a stream that handles and polls other objects. The connection itself is executed in a while loop:

while let Some(traffic) = connection.next().await {
   println!(...);
}

In each tick, the connection will try to read an incoming frame, it will pass the frame to a stream (through a dedicated store) and will send/flush possible outgoing frames:

Pin::new(self.socket) & POLL TRY-READ INCOMING FRAME
Pin::new(self.stream{i}) & POLL ACTIVE STREAM
Pin::new(self.socket) & POLL FLUSH OUTGOIN FRAMES

@Hyeonu that's what I'm curious about. In the code, I actually use BiLock because the store object is only shared between the connection and the stream objects. For each stream, there is a dedicated store object. I think you added another fragment to my understanding here.

@rschoon

This is exactly what I'm trying to understand :). Is there a theoretical chance where poll_lock returns Poll::Pending or not - 0 or 1 :)? Based on other comments, I'd assume there is a chance - so 1 ;).

Let me try to rephrase my question. What exactly happens when poll_xxx is called? Is that poll action called in another thread or is it considered a synchronous call to a function? I guess this is where the async logic is triggered and the code is run in another thread but could already be finished where the Poll::Ready is returned in this case. Right? I guess it depends on the runtime you use but let's always consider the edge case, so all the possibilities. Will the poll_ action choose some available thread or it is considered that within the poll function a developer could use a thread and we don't know that so we just treat it as there is a thread ;). If the execution of a poll_ is always considered a single thread, I don't have to use the async version of Mutex and I'd be better with the standard std version. Is the .await the point where a thread is chosen and the rest of the code is executed in the selected thread? So many questions :). Anyone?

Guys, I just talked to my friend @alice who helps us all here with good answers. Here's the statement:

by @alice: Thread move only happens after the future has returned from poll, giving control of the thread back to the executor. At this point, the future is just a value that can be manipulated like any other.

My example above can be treated as a normal sync code and async Mutex is not needed. It seems there is no magic behind the scene and I was too suspicious :slight_smile:.

In the example above, indeed if those are the only two things accessing the store, they will never step on each other's feet, and you should just use an ordinary std mutex.

1 Like