Trying to use `try_next` with explicit `tokio::pin` for `impl TryStream`

Can somebody please explain to me why this does not work, I know how to fix it, I just can't understand why this version does not work?

use anyhow::Error;
use futures::{stream, Stream, StreamExt, TryStream, TryStreamExt};

// WORKS:
// fn stream() -> impl Stream<Item = Result<i32, Error>> {
// DOES NOT WORK:
fn stream() -> impl TryStream<Ok = i32, Error = Error> {
    stream::once(async { Ok(42) })
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    let stream = stream(); // ALSO WORKS: if added .into_stream();
    tokio::pin!(stream);
    while let Some(val) = stream.try_next().await? {
        println!("{}", val);
    }
    Ok(())
}

Playground
It complains about missing pin even though it is explicitly pinned.
Thank you!

I believe this might be due to the fact that TryStream has a too general Stream supertrait bound that doesn't specify the relation of TryStream::Ok/Err to Stream::Item, which means that the impl TryStream will not be known to be Stream<Item = Result<...>>. Then this Stream impl will propagate from T = impl TryStream over to stream: Pin<&mut T> which is thus not known to implement TryStream itself.

I'm currently in mobile, so it's hard to test much; I'm not sure if the lack of Item= specification in the supertrait bound in the definition of the TryStream trait wouldn't be supported by rustc, or perhaps by their MSRV, or what other reason there might be why it's missing; I also haven't tested (yet) whether adding an explit Item = Result<i32, Error> to your impl TryStream type (or perhaps simply replacing it with impl Stream<Item = Result<i32, Error>> actually makes the error go away as I'd expect.

Edit: Oh, I completely overlooked that you already noted that an impl Stream bound does work. Well, that confirms my theory, I guess.

Edit2: Looking at the other remark, it seems as if into_stream exists precisely to work around this limitation, too, judging by its documentation.

Edit3: I just recalled that I did already ask for more context about this in an issue at some point. See: TryStream and compiler “limitations” · Issue #2508 · rust-lang/futures-rs · GitHub .

Thank you!
Indeed adding Item=Result<... also fixes the issue. Though I'm still a bit baffled by the compiler error

the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@src/main.rs:8:24: 8:34]>`

I would expect it to be: something not implementing Future

Ah. No, that's easy to explain. There's a DerefMut implementation for Pin<&mut T> so method resolution will auto-deref from the &mut Pin<&mut impl TryStream> to &mut impl TryStream and then find the try_next method on that and desugar to calling it.

The DerefMut implementation for Pin<&mut T> does however require T: Unpin, hence the error message. Edit: Actually, I think it's not even that necessary that we're running into here (yet), instead the try_next method itself requires Self: Unpin (which usually isn't a problem because Self can be Pin<&mut T> for a non-Unpin type T).

(At least that's what I think might be happening.)

Well but, isn't the same logic applied to this code when we have Stream instead of TryStream, how we end up with missing Unpin for TryStream and it is just fine for Stream. Code explicitly pins stream to the stack in both cases

In the other case, the compiler sees the implementation of TryStream for Pin<&mut impl Stream<Item = Result<...>> itself, so method resolution will not try any dereferencing and finds a try_next method on the original type right away.

To learn more about the method resolution algorithm, see: Method call expressions - The Rust Reference

Thank you this exactly what I was missing:
Stream has this impl

impl<P> Stream for Pin<P>
where
    P: DerefMut + Unpin,
    P::Target: Stream,

and we do not have similar impl for TryStream, then compiler tries dereference it with DerefMut it succeeds in getting impl TryStream but looses Unpin in the process.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.