Hello all, needing a little help here with asynchrony.
Preamble
I'm a long-winded person, but I'm going to give you the rundown so that you aren't super confused and trying to get familiar with the code on your own. You're going to open the code in your browser and then realize that it needs to be cloned and run anyway, so you may appreciate the thorough debriefing beforehand. If you want a description of the problem right away, see the section titled The Problem.
I have a REST API endpoint that returns search results and I need to paginate over that. I sought out to create an asynchronous Stream
that can handle that. My first attempt looked very much like the code that will follow, but somebody on Zulip suggested I use the try_stream!
macro from the async-stream
crate. I made the thing a second time with that and decided that it coupled the code together way too much for my taste, so I started from scratch to avoid the lifetime issues I was having with my first approach. Please do not suggest going back to that macro, I will not.
Design
I played around with a few designs for a rustaceous API for keeping the paginator generic and easy to implement for any HTTP library and REST API. I settled on the following design--if you have a better suggestion, I've probably tried it, but shoot anyway and we can discuss.
Delegate
There is a PaginationDelegate
trait that can be implemented on a type that handles three basic things.
-
Keeping track of the current offset, which is the index of the last item on the previous page. Serves as the index before the first item on the next page. The implementer can do a little math if they need a concrete page number (which is shown in the example to come).
-
fn offset(&self) -> usize
-
fn set_offet(&mut self) -> usize
-
-
Keeping track of the total results that the API has for us to paginate over. This is important to know the upper-limit, when the stream will be closed and
Poll::Ready(None)
yielded.fn total_items(&self) -> Option<usize>
-
Make the request for the next page. The offset is handled for this method by the
PaginatedStream
, so the only inherent responsibility of this method is actually making the request and returning the future.It is also necessary, if this isn't already known, to update an internal field that
total_items
will need to read from. If the stream gets to the end of the first page of results, and that method returnsNone
, the stream will continue forever making new requests. The upper limit will be treated asusize::MAX
. This would be considered undefined behavior, except it is defined here.async fn next_page(&mut self) -> Result<Vec<Self::Item>, Self::Error>
Stream
The star of the show is PaginatedStream
, an enumeration that both implements the Stream
trait and keeps track of the state machine. Each variant is a structure, with fields relevant to the state.
Variants:
-
Request
- The first state, owns the delegate and nothing else. When the stream is polled, it immediately uses the delegate fornext_page
and sets the state toPending
, yieldingPoll::Pending
. -
Pending
- A request has been made but not previously resolved. This will poll the fieldfuture
to determine the control flow.-
If the future is
Poll::Ready
containing anOk
, unpack the tuple that the future returns.This would be
(D, Vec<T>)
whereD
is the delegate andT
is the item that is yielded from the stream.Update the delegate's page offset with
set_offset
and collect the results and the delegate into the new state,Ready
, and yield the first item from the results. -
If the future is
Poll::Ready
containing anErr
, set the stream's state toClosed
and yield the error as owned. -
If the future is
Poll::Pending
, retain the current state and yieldPoll::Pending
.
-
-
Ready
- A request has previously been made, and there may be items to yield. This will pop the item at the front of the buffer and determine what to do with it.-
There is an item, retain the
Ready
state with the buffer of items (fielditems
) one less in length. -
Popping an item returned
None
, and the stream has not reached the limitD::total_items
. Set the state toRequest
and the stream will poll itself again. -
If the stream has reached that limit, set the state to
Closed
and yieldNone
.
-
-
Closed
- The stream has either yielded all of its expected items, or has encountered--and yielded--an error. ReturnPoll::Ready(None)
. -
Indeterminate
- Used internally as a placeholder value while the state is yet to be determined, this isunreachable!()
.
The Problem
Here are links to the relevant code segments that I will refer to. The example iterates over results from searching for issues from the GitHub API.
Because of the arbitrary, stupid, rule that new users can only have two links, you're going to have to manually locate code in the repository.
1Example implementation of
PaginationDelegate
:examples/pagination_example/src/github.rs#L109-L147
- Makes requests via
Client::search_issues
:examples/pagination_example/src/github.rs#L30-L42
2Where the
PaginatedStream
wraps the future returned fromPaginationDelegate::next_page
: src/paginator.rs#L140-L1463The example that showcases the use of this crate:
[examples/pagination_example/src/bin/github.rs#L21-L42](https://github.com/spikespaz/awaur/blob/master/examples/pagination_example/src/bin/github.rs#L21-L42)
When the example tries to iterate over the results (3), the program just runs forever, using no system resources. No CPU, so no busy loop, and network activity, so no requests. It appears to have gone full-zombie. The conclusion that I have come to is that the asynchronous executor simply never wakes the future contained in PaginatedStream::Pending
.
When using the CLion debugger, I notice the following control flow:
-
The stream is initialized properly with the
Request
state. -
The stream is polled, going into the handling for the
Request
state where the future is created and pinned on the heap (2). -
The stream sets the state to
Pending
and yieldsPoll::Pending
as expected. -
The next iteration of the loop (3) never happens. The stream stays on
Pending
forever, and the future fromPaginationDelegate::next_page
is never polled (1).
This is what I need help wrapping my head around. I see no reason that the future should never be awakened. Is it because of the nesting, where it is wrapped (2)? Is it because it is pinned to the heap? I do wonder if it happens because of the wrapping, which I would like to avoid so that the delegate stays separate from the future. I have a question that I have asked on Stack Overflow about this.
Improvements to Make
-
Flatten
PaginatedStream::Pending
structure so that the delegate remains owned, not by the future. This would mean that the future wouldn't have to be wrapped, and that methods such assize_hint
can use it to return information while the state isPending
instead of justRequest
andReady
. -
Handle an API max limit for results without tainting the information provided by
PaginationDelegate::total_items
. -
Provide a method on the
PaginatedStream
that provides anOption<&PaginationDelegate>
so that its getter methods can be accessed, and anunsafe
method forOption<&mut PaginationDelegate>
.