Stream proxy handling errors

Hello! I want to write a function that takes a stream and returns another stream, both with type impl Stream<Item=Result<Bytes, Error>>. The main complication is that the input stream must be processed by a function which only accepts a stream of type impl Stream<Item=Bytes>, so the input stream must be filtered, but in case of an error I still would like to know asap.
My idea then was to create a couple of channels and use one to send/receive the Ok items, and the other to handle the result stream.
Here's the code:

use futures::SinkExt;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::stream;
use futures::Stream;
use bytes::Bytes;

fn some_work(
    mut input_stream: impl Stream<Item = Result<Bytes, &'static str >> + Send + 'static + Unpin,
) -> impl Stream<Item = Result<Bytes, &'static str>> {
    let (mut tx_input, rx_input) = futures::channel::mpsc::channel(0);
    let (mut tx_output, rx_output) = futures::channel::mpsc::channel(0);
    let mut tx_output_clone = tx_output.clone();
    tokio::spawn(async move {
        while let Some(item) = input_stream.next().await {
            if let Ok(item) = item {
                tx_input.send(item).await.unwrap();
            } else {
                tx_output.send(Err("Issue with input stream")).await.unwrap();
                break;
            }
        }
    });

    tokio::spawn(async move {
        let mut processed_stream = process_bytes(rx_input);
        while let Some(item) = processed_stream.next().await {
            tx_output_clone.send(item).await.unwrap();
        }
    });
    rx_output
    
}

fn process_bytes(
    input_stream: impl Stream<Item = Bytes>
) -> impl Stream<Item = Result<Bytes, &'static str>> {
    input_stream.map(|item| {
        Ok(item)
    })
}

#[tokio::main]
async fn main() -> Result<(), &'static str> {
    let input_stream = stream::iter(vec![
        Ok(Bytes::from("1")),
        Err("Input error"),
    ]);
    
    let output_stream = some_work(input_stream);
    let result = output_stream.try_collect::<Vec<_>>().await?;
    println!("{:?}", result);
    Ok(())
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=78966f97a1abf3984baf8139d51e1586

This works but I was wondering if there was a better and more performant way to do the above.
Thanks for any comments and/or suggestions :smiley:

I think you could use filter_map to avoid needing two channels at the very least. Just filter out the error cases and send them to the error handler, and let the items pass through.

I tried to use filter_map but I don't think it applies well to my case...

One requirement that I haven't mentioned, but is accomplished with the above solution is that i'd like to stop processing the input stream at the first error.
But with filter_map i lose the ability to communicate when to stop the stream. Using the combinators I came up with somethig like this:

    let filtered_stream = input_stream
        .then(move |x| {
            let mut tx_output_clone = tx_output.clone();
            async move {
                if x.is_err() {
                    tx_output_clone.send(Err("error".to_string())).await.unwrap();
                };
                x
            }
        })
        .take_while(|x| future::ready(x.is_ok()))
        .map(|x| x.unwrap());

instead of then I could maybe use inspect and then use the blocking send from tokio channels... not sure that's a good solution. Also notice how I have to clone the sender for each element in the stream. At least this allow me to save one channel :slight_smile:

So in general not too happy with this solution, but I'll try profiling later and see which one is faster in my usecase. Meanwhiel if anyone has suggestion, happy to hear them.