Hello! I want to write a function that takes a stream and returns another stream, both with type impl Stream<Item=Result<Bytes, Error>>
. The main complication is that the input stream must be processed by a function which only accepts a stream of type impl Stream<Item=Bytes>
, so the input stream must be filtered, but in case of an error I still would like to know asap.
My idea then was to create a couple of channels and use one to send/receive the Ok
items, and the other to handle the result stream.
Here's the code:
use futures::SinkExt;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::stream;
use futures::Stream;
use bytes::Bytes;
fn some_work(
mut input_stream: impl Stream<Item = Result<Bytes, &'static str >> + Send + 'static + Unpin,
) -> impl Stream<Item = Result<Bytes, &'static str>> {
let (mut tx_input, rx_input) = futures::channel::mpsc::channel(0);
let (mut tx_output, rx_output) = futures::channel::mpsc::channel(0);
let mut tx_output_clone = tx_output.clone();
tokio::spawn(async move {
while let Some(item) = input_stream.next().await {
if let Ok(item) = item {
tx_input.send(item).await.unwrap();
} else {
tx_output.send(Err("Issue with input stream")).await.unwrap();
break;
}
}
});
tokio::spawn(async move {
let mut processed_stream = process_bytes(rx_input);
while let Some(item) = processed_stream.next().await {
tx_output_clone.send(item).await.unwrap();
}
});
rx_output
}
fn process_bytes(
input_stream: impl Stream<Item = Bytes>
) -> impl Stream<Item = Result<Bytes, &'static str>> {
input_stream.map(|item| {
Ok(item)
})
}
#[tokio::main]
async fn main() -> Result<(), &'static str> {
let input_stream = stream::iter(vec![
Ok(Bytes::from("1")),
Err("Input error"),
]);
let output_stream = some_work(input_stream);
let result = output_stream.try_collect::<Vec<_>>().await?;
println!("{:?}", result);
Ok(())
}
This works but I was wondering if there was a better and more performant way to do the above.
Thanks for any comments and/or suggestions