How to use futures stream forward?

I am using futures_preview and I am puzzled by futures::stream::StreamExt::forward signature. Why Self::Item is constrained to Result?

I looked at Forward implementation but did not get the point. Are there any examples of use?

The reason that the stream must be of results is so that any errors from the stream or sink can be put together into the output future, i.e. you take a Stream<Item = Result<T, E>> and a Sink<SinkItem = T, SinkError = E> and get back a Future<Item = Result<_, E>> which will resolve with an error if either pulling an item from the stream or pushing an item into the sink fail.

It's unfortunate that there's no example for this adaptor, but I think if you have an infallible stream you should be able to just do something like stream.map(Ok).forward(sink) and type inference will correct the error type for you.

2 Likes

Thank you @Nemo157. Now when you explain it, it seems so obvious! :slight_smile:

Unfortunately, type inference does not work when you need to fit that to an explicitly given Forward type:

pub struct StreamForwarder<S: Stream> {
    inner: stream::Forward<
        stream::Map<S, fn(S::Item) -> Result<S::Item, mpsc::SendError>>,
        mpsc::Sender<S::Item>,
    >,
}

So I find myself compelled to write an ugly type annotation:

    let ok_stream: stream::Map<S, fn(_) -> Result<_, mpsc::SendError>> = stream.map(Ok);
    let forwarder = StreamForwarder {
        inner: ok_stream.forward(tx),
    };

It would be nice to have a ForwardInfallible combinator which would just forward items from a Stream.

1 Like