I am working on a crate that accepts streams from users and typically processes them into another type of stream. For example, accepting a stream of file paths and returning a stream of file sizes.
Working example (pretending errors don't exist)
use futures::stream::iter;
use futures::stream::BoxStream;
use futures::StreamExt;
struct FilesHandler {
}
impl FilesHandler {
fn file_sizes<'a>(&'a self, files: BoxStream<'a, String>) -> BoxStream<u64> {
files.map(|s| s.len() as u64).boxed()
}
fn list_files(&self) -> BoxStream<String> {
iter(["abc", "xyz", "a"].map(|s| s.to_owned())).boxed()
}
}
async fn use_file_handler() {
let handler = FilesHandler {};
let total_size = handler.file_sizes(handler.list_files())
.fold(0, |acc, x| async move { acc + x }).await;
println!("The total size of all files is {}", total_size)
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
use_file_handler().await;
}
Unfortunately, errors do exist, and this is where things start to get strange. Generally, we are ok, we have TryStream
and TryStreamExt
. However, try streams are somewhat contagious. There is no way to convert from a stream of results (e.g. dyn Stream<Item = Result<T>>
) to a stream of values (e.g. dyn Stream<Item = T>
) that I am aware of. As a result, the only way that file_sizes
can accept the value returned from list_files
is for file_sizes
to accept a stream of results.
Working (but awkward) example that handles errors
use futures::stream::iter;
use futures::stream::BoxStream;
use futures::StreamExt;
use futures::TryStreamExt;
#[derive(Debug)]
struct LibError {
}
struct FilesHandler {
}
impl FilesHandler {
fn file_sizes<'a>(
&'a self,
files: BoxStream<'a, Result<String, LibError>>,
) -> BoxStream<Result<u64, LibError>> {
files.map_ok(|s| s.len() as u64).boxed()
}
fn list_files(&self) -> BoxStream<Result<String, LibError>> {
iter(["abc", "xyz", "a"].map(|s| Ok(s.to_owned()))).boxed()
}
}
async fn use_file_handler() -> Result<(), LibError> {
let handler = FilesHandler {};
let total_size = handler.file_sizes(handler.list_files())
.try_fold(0, |acc, x| async move { Ok(acc + x) }).await?;
println!("The total size of all files is {}", total_size);
Ok(())
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), LibError>{
use_file_handler().await?;
Ok(())
}
Why is this awkward?
Everything above works ok and makes sense as is. However, it starts to get strange when you consider the fact that users might be generating a stream of paths on their own, without using the list_files
method. For example:
fn get_my_files() -> BoxStream<Result<String, AppError>> {...}
Here, AppError
is an application-level error that exists in the user's application. They aren't going to use LibError
for errors generating from their own functions. But now they cannot easily call file_sizes
. They could use err_into
if they implement:
impl From<AppError> for LibError {
...
}
However, this is all very awkward:
- It's awkward that
file_sizes
accepts a stream ofResult<T>
in the first place. Why should a method processing a stream care about failed items? - It's awkward that the user now has to create
LibError
fromAppError
. It's very common for an application to wrap library errors (e.g.impl From<LibError> for AppError
is very common) but very strange to go the other direction.
Is there something I am missing? Or is this just an oddity of using streams that I must accept at the moment?