Stream proxy handling errors

Hello! I want to write a function that takes a stream and returns another stream, both with type impl Stream<Item=Result<Bytes, Error>>. The main complication is that the input stream must be processed by a function which only accepts a stream of type impl Stream<Item=Bytes>, so the input stream must be filtered, but in case of an error I still would like to know asap.
My idea then was to create a couple of channels and use one to send/receive the Ok items, and the other to handle the result stream.
Here's the code:

use futures::SinkExt;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::stream;
use futures::Stream;
use bytes::Bytes;

fn some_work(
    mut input_stream: impl Stream<Item = Result<Bytes, &'static str >> + Send + 'static + Unpin,
) -> impl Stream<Item = Result<Bytes, &'static str>> {
    let (mut tx_input, rx_input) = futures::channel::mpsc::channel(0);
    let (mut tx_output, rx_output) = futures::channel::mpsc::channel(0);
    let mut tx_output_clone = tx_output.clone();
    tokio::spawn(async move {
        while let Some(item) = input_stream.next().await {
            if let Ok(item) = item {
                tx_input.send(item).await.unwrap();
            } else {
                tx_output.send(Err("Issue with input stream")).await.unwrap();
                break;
            }
        }
    });

    tokio::spawn(async move {
        let mut processed_stream = process_bytes(rx_input);
        while let Some(item) = processed_stream.next().await {
            tx_output_clone.send(item).await.unwrap();
        }
    });
    rx_output
    
}

fn process_bytes(
    input_stream: impl Stream<Item = Bytes>
) -> impl Stream<Item = Result<Bytes, &'static str>> {
    input_stream.map(|item| {
        Ok(item)
    })
}

#[tokio::main]
async fn main() -> Result<(), &'static str> {
    let input_stream = stream::iter(vec![
        Ok(Bytes::from("1")),
        Err("Input error"),
    ]);
    
    let output_stream = some_work(input_stream);
    let result = output_stream.try_collect::<Vec<_>>().await?;
    println!("{:?}", result);
    Ok(())
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=78966f97a1abf3984baf8139d51e1586

This works but I was wondering if there was a better and more performant way to do the above.
Thanks for any comments and/or suggestions :smiley:

I think you could use filter_map to avoid needing two channels at the very least. Just filter out the error cases and send them to the error handler, and let the items pass through.

I tried to use filter_map but I don't think it applies well to my case...

One requirement that I haven't mentioned, but is accomplished with the above solution is that i'd like to stop processing the input stream at the first error.
But with filter_map i lose the ability to communicate when to stop the stream. Using the combinators I came up with somethig like this:

    let filtered_stream = input_stream
        .then(move |x| {
            let mut tx_output_clone = tx_output.clone();
            async move {
                if x.is_err() {
                    tx_output_clone.send(Err("error".to_string())).await.unwrap();
                };
                x
            }
        })
        .take_while(|x| future::ready(x.is_ok()))
        .map(|x| x.unwrap());

instead of then I could maybe use inspect and then use the blocking send from tokio channels... not sure that's a good solution. Also notice how I have to clone the sender for each element in the stream. At least this allow me to save one channel :slight_smile:

So in general not too happy with this solution, but I'll try profiling later and see which one is faster in my usecase. Meanwhiel if anyone has suggestion, happy to hear them.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.