How to accept an async Stream as argument?

I want to pass an async Stream to a function. Which variant is better?

Variant A:

async fn foo(mut some_stream: impl Stream<Item = SomeType> + Unpin) {
    while /* … */ {
        let item_option = some_stream.next().await;
        /* … */
    }
}

Variant B:

async fn foo(some_stream: impl Stream<Item = SomeType>) {
    tokio::pin!(some_stream);
    while /* … */ {
        let item_option = some_stream.next().await;
        /* … */
    }
}

I assume both variants are more or less equally powerful, as in case of Variant A, the caller could always pin! any !Unpin value before passing it, and in case of Variant B, the pin!'ing to the stack doesn't really cause any extra runtime overhead.

To me, Variant B feels more friendly for the caller (but a bit more tedious for the callee). What's the idiomatic way to do this?

This problem reminds me of my previous question about Tokio's AsyncRead/Write(Ext) traits, though the case is perhaps a bit different here, as I do not pass a reference.


I just looked into the documentation of StreamExt::next, which gives an explanation why the stream needs to be Unpin:

Note that because next doesn’t take ownership over the stream, the Stream type must be Unpin. If you want to use next with a !Unpin stream, you’ll first have to pin the stream. This can be done by boxing the stream using Box::pin or pinning it to the stack using the pin_mut! macro from the pin_utils crate.

The bound Self: Unpin also reminds me very much of the Async…Ext traits in Tokio. So maybe it is the same duality here. But still unsure what's best to do (i.e. what is the idiomatic way) in this particular case.

Yes, it seems to be similar to AsyncRead/Write. Looking at these API docs of the respective ...Ext traits again, I guess the convention is to have owned / by-value arguments be without the : Unpin bound, but to give methods with &mut self an Unpin bound (instead of requiring self: Pin<&mut Self>).

Also, note that unlike AsyncRead/AsyncWrite, the Stream trait is actually commonly implemented by types that don't implenent Unpin. As soon as you're applying some combinators like StreamExt::then or StreamExt::filter etc, with an async fn or a closure returning an async block, the resulting stream is !Unpin.

Thus, I'd say that clearly the Variant B is the better alternative.

2 Likes

That feels good to me (so far).

Okay, so that's also a reason to go for Variant B in this particular case.

I still find it a bit annoyingly complex that I have to manually pin! the stream. Of course, within the current design of Rust, it makes sense; but since pinning (to the stack) doesn't come with any overhead (right?), it might be nicer to be able to write something like:

async fn foo(#[pin] some_stream: impl Stream<Item = SomeType>) {
    while /* … */ {
        let item_option = some_stream.next().await;
        /* … */
    }
}

Would it be possible to create such a macro?

By the way, dealing with pinning and unpinning reminds me of my following comment in another thread:

Dealing with Streams suddenly is a case where I have to understand pinning/unpinning as a "user" of Rust. I wonder why this feels so complex:

  • Is it because there are no keywords or designated syntax constructs for pin/unpin (i.e. because pinning and unpinning is implemented "on top" of Rust through traits, macros, and smart pointers hiding their value from the user, which ultimately leads to even more complex issues such as projections)?
  • Is it inherently complex (same as having to understand by-value vs & vs &mut)?
1 Like

Thinking again about it, maybe the pin!(x) or pin_mut!(x) syntax in the function's body isn't soooo bad, as what is stored in x is pinned afterwards (and won't ever be unpinned again), so it's sort of a modification of what's stored in x. A function might also not want to pin the argument in every case (but sometimes pass it on unpinned, for example). Therefore, pin! might only happen in an if-branch inside the function's body.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.