Implementation of async `Stream` never gets polled after the first time

Hello all, needing a little help here with asynchrony.

Preamble

I'm a long-winded person, but I'm going to give you the rundown so that you aren't super confused and trying to get familiar with the code on your own. You're going to open the code in your browser and then realize that it needs to be cloned and run anyway, so you may appreciate the thorough debriefing beforehand. If you want a description of the problem right away, see the section titled The Problem.

I have a REST API endpoint that returns search results and I need to paginate over that. I sought out to create an asynchronous Stream that can handle that. My first attempt looked very much like the code that will follow, but somebody on Zulip suggested I use the try_stream! macro from the async-stream crate. I made the thing a second time with that and decided that it coupled the code together way too much for my taste, so I started from scratch to avoid the lifetime issues I was having with my first approach. Please do not suggest going back to that macro, I will not.

Design

I played around with a few designs for a rustaceous API for keeping the paginator generic and easy to implement for any HTTP library and REST API. I settled on the following design--if you have a better suggestion, I've probably tried it, but shoot anyway and we can discuss.

Delegate

There is a PaginationDelegate trait that can be implemented on a type that handles three basic things.

  • Keeping track of the current offset, which is the index of the last item on the previous page. Serves as the index before the first item on the next page. The implementer can do a little math if they need a concrete page number (which is shown in the example to come).

    • fn offset(&self) -> usize

    • fn set_offet(&mut self) -> usize

  • Keeping track of the total results that the API has for us to paginate over. This is important to know the upper-limit, when the stream will be closed and Poll::Ready(None) yielded.

    • fn total_items(&self) -> Option<usize>
  • Make the request for the next page. The offset is handled for this method by the PaginatedStream, so the only inherent responsibility of this method is actually making the request and returning the future.

    It is also necessary, if this isn't already known, to update an internal field that total_items will need to read from. If the stream gets to the end of the first page of results, and that method returns None, the stream will continue forever making new requests. The upper limit will be treated as usize::MAX. This would be considered undefined behavior, except it is defined here.

    • async fn next_page(&mut self) -> Result<Vec<Self::Item>, Self::Error>

Stream

The star of the show is PaginatedStream, an enumeration that both implements the Stream trait and keeps track of the state machine. Each variant is a structure, with fields relevant to the state.

Variants:

  • Request - The first state, owns the delegate and nothing else. When the stream is polled, it immediately uses the delegate for next_page and sets the state to Pending, yielding Poll::Pending.

  • Pending - A request has been made but not previously resolved. This will poll the field future to determine the control flow.

    • If the future is Poll::Ready containing an Ok, unpack the tuple that the future returns.

      This would be (D, Vec<T>) where D is the delegate and T is the item that is yielded from the stream.

      Update the delegate's page offset with set_offset and collect the results and the delegate into the new state, Ready, and yield the first item from the results.

    • If the future is Poll::Ready containing an Err, set the stream's state to Closed and yield the error as owned.

    • If the future is Poll::Pending, retain the current state and yield Poll::Pending.

  • Ready - A request has previously been made, and there may be items to yield. This will pop the item at the front of the buffer and determine what to do with it.

    • There is an item, retain the Ready state with the buffer of items (field items) one less in length.

    • Popping an item returned None, and the stream has not reached the limit D::total_items. Set the state to Request and the stream will poll itself again.

    • If the stream has reached that limit, set the state to Closed and yield None.

  • Closed - The stream has either yielded all of its expected items, or has encountered--and yielded--an error. Return Poll::Ready(None).

  • Indeterminate - Used internally as a placeholder value while the state is yet to be determined, this is unreachable!().

The Problem

Here are links to the relevant code segments that I will refer to. The example iterates over results from searching for issues from the GitHub API.

Because of the arbitrary, stupid, rule that new users can only have two links, you're going to have to manually locate code in the repository.

  • 1Example implementation of PaginationDelegate: examples/pagination_example/src/github.rs#L109-L147

    • Makes requests via Client::search_issues: examples/pagination_example/src/github.rs#L30-L42
  • 2Where the PaginatedStream wraps the future returned from PaginationDelegate::next_page: src/paginator.rs#L140-L146

  • 3The example that showcases the use of this crate: [examples/pagination_example/src/bin/github.rs#L21-L42](https://github.com/spikespaz/awaur/blob/master/examples/pagination_example/src/bin/github.rs#L21-L42)

When the example tries to iterate over the results (3), the program just runs forever, using no system resources. No CPU, so no busy loop, and network activity, so no requests. It appears to have gone full-zombie. The conclusion that I have come to is that the asynchronous executor simply never wakes the future contained in PaginatedStream::Pending.

When using the CLion debugger, I notice the following control flow:

  • The stream is initialized properly with the Request state.

  • The stream is polled, going into the handling for the Request state where the future is created and pinned on the heap (2).

  • The stream sets the state to Pending and yields Poll::Pending as expected.

  • The next iteration of the loop (3) never happens. The stream stays on Pending forever, and the future from PaginationDelegate::next_page is never polled (1).

This is what I need help wrapping my head around. I see no reason that the future should never be awakened. Is it because of the nesting, where it is wrapped (2)? Is it because it is pinned to the heap? I do wonder if it happens because of the wrapping, which I would like to avoid so that the delegate stays separate from the future. I have a question that I have asked on Stack Overflow about this.

Improvements to Make

  • Flatten PaginatedStream::Pending structure so that the delegate remains owned, not by the future. This would mean that the future wouldn't have to be wrapped, and that methods such as size_hint can use it to return information while the state is Pending instead of just Request and Ready.

  • Handle an API max limit for results without tainting the information provided by PaginationDelegate::total_items.

  • Provide a method on the PaginatedStream that provides an Option<&PaginationDelegate> so that its getter methods can be accessed, and an unsafe method for Option<&mut PaginationDelegate>.

On line 151 you return Poll::Pending without having polled any futures or otherwise woken the waker in the context. You have to do one of those to get polled again.

The easiest fix is probably to add cx.waker().wake_by_ref() before returning Poll::Pending, but it is more performant to wrap the match in a loop and continue on line 151. Another option is to have the function call itself recursively.

I've just noticed that you posted this in several places. Please tell us about it when you do that so I don't waste my time answering a question if it has already been answered.

Not all questions have been answered.

What are the missing questions?

At the bottom is a Stack Overflow link, that it seems nobody cares about. That to me is a bigger issue, I need the delegate to remain under the ownership of the stream for the improvements listed last.

Also, you didn't waste your time, the loop thing I may have questions about. I'm looking at it now, but not sure what you mean.

Regarding the stack overflow link, the question appears to be "why can't I keep delegate and future in two separate fields?". The answer is that future contains a reference to delegate, making your struct self-referential. Rust does not provide any (safe) way to write your own self-referential structs.

Then I should like to do it unsafely. Having the delegate be owned by the stream is necessary to get certain read-only information that is guaranteed not to be changing (with the exception of total_items) while the state is Pending. All of the methods of the delegate that are getters should be accessible by a reference to the delegate, which I would like to hand out. I don't see a problem with providing these methods, the only issue I can see is that the next_page method does need to mutate, and I am not familiar with smart pointers to know which one to use. On the first implementation I actually did find a solution to this on my own but can't remember what it was, and no matter what I try, can't seem to stumble over it again.

I'm not too sure what you mean about using a loop around the match. What would that accomplish? Wouldn't that just keep churning the logic and block the executor until an explicit return?

To this I would like to add that, unfortunately, since your future takes a mutable reference to the delegate, it's impossible to do this correctly. Mutable references must be exclusive, so any other access to the delegate while the future exists is undefined behavior.

You would only ever go around the loop a small fixed number of times, so you aren't actually blocking the runtime. In the cases where you received a Poll::Pending by polling a future, it is ok to return Poll::Pending.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.