StreamExt in tokio_stream - Rust

Quoting the documentation:

The provided closure is executed over all elements of this stream as they are made available. It is executed inline with calls to poll_next.

I am curious about the definition of "as they are made available". Does this mean:

  1. whenever a Self::Item is pushed on the stream OR
  2. lazily, whenever a T is TAKEN from the stream

My understanding, consistent with the rest of Rust, is that it is (2), but the exact phrasing "as they are made available" is not obvious to me.

EDIT:

As a followup, does map keep an 'internal buffer' of T's ? (Intuition says 'no', but I am not sure.)

Yes, this is correct. You can verify from looking at the implementation:

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
        self.as_mut()
            .project()
            .stream
            .poll_next(cx)
            .map(|opt| opt.map(|x| (self.as_mut().project().f)(x)))
    }

It simply calls the function f on each item when a successful poll happens and immediately returns the result. There is no internal buffering to "reach back" into the stream, and there in fact cannot be with any of those wrapper futures/streams because the underlying stream only polls successfully after the values have been taken out of whatever buffer the stream uses.

In other words, the Stream trait is only concerned with "taking", and everything in StreamExt is implemented strictly in terms of that. The only reason there would be internal buffering is if a stream wrapper was trying to store some values to deliberately delay them until later.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.