[futures] Is it possible to chain two `TryStream`s?

I wanted to chain two different TryStreams producing the same Item type, and then call try_collect() on the chained stream:

fn foo_stream(id: u32) -> impl TryStream<Ok = u32, Error = Infallible> {
    futures::stream::once(async move { Ok(id) })
}

let result: Vec<_> = foo_stream(1)
    .chain(foo_stream(2))
    .try_collect()
    .await
    .unwrap();

But the above code fails to compile with a “trait bounds not satisfied” error (Rust Playground):

error[E0599]: the method `try_collect` exists for struct `futures::stream::Chain<impl TryStream<Ok = u32, Error = Infallible>, impl TryStream<Ok = u32, Error = Infallible>>`, but its trait bounds were not satisfied
  --> src/main.rs:12:10
   |
12 |           .try_collect()
   |            ^^^^^^^^^^^ method cannot be called on `futures::stream::Chain<impl TryStream<Ok = u32, Error = Infallible>, impl TryStream<Ok = u32, Error = Infallible>>` due to unsatisfied trait bounds
   |
  ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.21/src/stream/stream/chain.rs:7:1
   |
7  | / pin_project! {
           ...
17 | | }
   | | -
   | | |
   | |_doesn't satisfy `_: TryStreamExt`
   |   doesn't satisfy `_: TryStream`
   |
   = note: the following trait bounds were not satisfied:
           `futures::stream::Chain<impl TryStream<Ok = u32, Error = Infallible>, impl TryStream<Ok = u32, Error = Infallible>>: TryStream`
           which is required by `futures::stream::Chain<impl TryStream<Ok = u32, Error = Infallible>, impl TryStream<Ok = u32, Error = Infallible>>: TryStreamExt`
           `&futures::stream::Chain<impl TryStream<Ok = u32, Error = Infallible>, impl TryStream<Ok = u32, Error = Infallible>>: TryStream`
           which is required by `&futures::stream::Chain<impl TryStream<Ok = u32, Error = Infallible>, impl TryStream<Ok = u32, Error = Infallible>>: TryStreamExt`
           `&mut futures::stream::Chain<impl TryStream<Ok = u32, Error = Infallible>, impl TryStream<Ok = u32, Error = Infallible>>: TryStream`
           which is required by `&mut futures::stream::Chain<impl TryStream<Ok = u32, Error = Infallible>, impl TryStream<Ok = u32, Error = Infallible>>: TryStreamExt`

which seemed strange to me, since the Chain object is a Stream producing Result<u32, Infallible> items, which satisfies the trait bounds of TryStream.

What's going wrong here?

The TryStream trait is defined as a sub-trait of Stream, but it does not specify that the item type in the Stream trait is necessarily Result<Ok, Error>, and so the compiler will refuse to infer what the item type in the Stream trait is. Because of this, it is not able to tell that the item type of your Chain will be a result, and therefore it is not able to tell that the Chain implements TryStream. All of this is due to compiler limitations in defining traits like TryStream.

To fix this, use the return type

impl Stream<Item = Result<u32, Infallible>>

instead.

1 Like

Thanks @alice! For others who happen to be reading this post, it is also possible to call TryStreamExt::into_stream() to turn a TryStream into Stream<Item = Result>, such as:

let result: Vec<_> = foo_stream(1)
    .into_stream()
    .chain(foo_stream(2).into_stream())
    .try_collect()
    .await
    .unwrap();
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.