I have implemented a Stream with a custom done callback, code:
impl<S, F> Stream for StreamWithDoneCallback<S, F>
where
S: Stream,
F: FnMut() -> (),
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let item = ready!(this.stream.poll_next(cx));
if item.is_none() { // here, the item is **ALWAYS** some
(this.done)();
}
Poll::Ready(item)
}
}
everything is fine in my test case, but when it build as release and deployed to production, the done callback is never been called, i've added some log and the reason is that item is never None
. Why is this happening?
The stream will be consumed by Axum
, i'm returning the stream as a HTTP Body, and when i test in production, all the body is received by http client so the body is consumed completely.
I think there are some problems with assumptions made about Stream
. It yields None
when the stream has been exhausted, and there are two cases where this will not be true:
- The
Stream
is an infinite generator. This can be modelled as an infinite loop that always yields Some
.
- Your StreamWithDoneCallback
Future
can be dropped at any time by the owner, so you are not guaranteed to poll the underlying Stream
to exhaustion.
If I had to guess without any other context to go on, I would suspect the second case is occurring. E.g. if your client closes the connection early.
An example of the first case would be if axum
is doing something more than naively consuming the stream. It probably needs to chain streams or something in order to ensure clients can stay connected if they don't include a Connection: close
request header. Things like that.
1 Like
many thanks for answering, and yes i think it should be the second case that the future has been dropped by axum or tower somehow.
it'll never be the first case cause i have added log in the begining of the poll_next
fn, and it has been called for finate times for every request.
i'm using axum as a reverse proxy, the Stream
is from some endpoint user request, and i just like bridge the stream to my users. But i have to do some cleanup work when the stream is done.
Things would be much more easier if i can add code after axum response every request. I asked axum experts and the answer is no way i can do it, i had to write my own request handling loop for myself. So i come up with this solution...
Anyway i'm going to dig more about this.
just found that if i make a request to the reverse proxy and downloading a large file. And then hit ctrl - C in the middle of the download. The log is the SAME as without hiting ctrl - C ... in the case the body is not consumed without a doult and my stream is never end ... ( done is not been called ), and the future is lossed somewhere ...
this SO post has some cludes about this ..