How should I handle a stream of results on API boundaries?

I am working on a crate that accepts streams from users and typically processes them into another type of stream. For example, accepting a stream of file paths and returning a stream of file sizes.

Working example (pretending errors don't exist)
use futures::stream::iter;
use futures::stream::BoxStream;
use futures::StreamExt;

struct FilesHandler {
    
}

impl FilesHandler {
    fn file_sizes<'a>(&'a self, files: BoxStream<'a, String>) -> BoxStream<u64> {
        files.map(|s| s.len() as u64).boxed()
    }
    fn list_files(&self) -> BoxStream<String> {
        iter(["abc", "xyz", "a"].map(|s| s.to_owned())).boxed()
    }
}


async fn use_file_handler() {
    let handler = FilesHandler {};
    let total_size = handler.file_sizes(handler.list_files())
                            .fold(0, |acc, x| async move { acc + x }).await;
    println!("The total size of all files is {}", total_size)
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
    use_file_handler().await;
}

Unfortunately, errors do exist, and this is where things start to get strange. Generally, we are ok, we have TryStream and TryStreamExt. However, try streams are somewhat contagious. There is no way to convert from a stream of results (e.g. dyn Stream<Item = Result<T>>) to a stream of values (e.g. dyn Stream<Item = T>) that I am aware of. As a result, the only way that file_sizes can accept the value returned from list_files is for file_sizes to accept a stream of results.

Working (but awkward) example that handles errors
use futures::stream::iter;
use futures::stream::BoxStream;
use futures::StreamExt;
use futures::TryStreamExt;

#[derive(Debug)]
struct LibError {
    
}

struct FilesHandler {
    
}

impl FilesHandler {
    fn file_sizes<'a>(
        &'a self,
        files: BoxStream<'a, Result<String, LibError>>,
    ) -> BoxStream<Result<u64, LibError>> {
        files.map_ok(|s| s.len() as u64).boxed()
    }
    fn list_files(&self) -> BoxStream<Result<String, LibError>> {
        iter(["abc", "xyz", "a"].map(|s| Ok(s.to_owned()))).boxed()
    }
}


async fn use_file_handler() -> Result<(), LibError> {
    let handler = FilesHandler {};
    let total_size = handler.file_sizes(handler.list_files())
                            .try_fold(0, |acc, x| async move { Ok(acc + x) }).await?;
    println!("The total size of all files is {}", total_size);
    Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), LibError>{
    use_file_handler().await?;
    Ok(())
}
Why is this awkward?

Everything above works ok and makes sense as is. However, it starts to get strange when you consider the fact that users might be generating a stream of paths on their own, without using the list_files method. For example:

fn get_my_files() -> BoxStream<Result<String, AppError>> {...}

Here, AppError is an application-level error that exists in the user's application. They aren't going to use LibError for errors generating from their own functions. But now they cannot easily call file_sizes. They could use err_into if they implement:

impl From<AppError> for LibError {
  ...
}

However, this is all very awkward:

  • It's awkward that file_sizes accepts a stream of Result<T> in the first place. Why should a method processing a stream care about failed items?
  • It's awkward that the user now has to create LibError from AppError. It's very common for an application to wrap library errors (e.g. impl From<LibError> for AppError is very common) but very strange to go the other direction.

Is there something I am missing? Or is this just an oddity of using streams that I must accept at the moment?

You can convet the error

impl FilesHandler {
    fn list_files(&self) -> BoxStream<Result<String, LibError>> {
        iter(["abc", "xyz", "a"].map(|s| Ok(s.to_owned()))).boxed()
    }
}

fn get_my_files() -> BoxStream<'static, Result<String, AppError>> {
    FilesHandler{}.list_files().map(|res| Ok(res?)).boxed()
}

Rust Playground


Update: maybe I've misread your question, but for clarity you'd better provide the code to show how awkward it will be.

Your example shows how to convert from LibError to AppError but I need to go the other way around. Here is an updated example:

use futures::stream::iter;
use futures::stream::BoxStream;
use futures::StreamExt;
use futures::TryStreamExt;

// ----------- LIBRARY CODE -----------------
#[derive(Debug)]
struct LibError {
    
}

struct FilesHandler {
    
}

impl FilesHandler {

    // Mock method that gets the size of a stream of files.  Maybe it batches
    // requests up and makes a few requests with many files.  Maybe it issues
    // many requests in parallel.  In reality this is a trait with multiple
    // implementations depending on the filesystem.
    fn file_sizes<'a>(
        &'a self,
        files: BoxStream<'a, Result<String, LibError>>,
    ) -> BoxStream<Result<u64, LibError>> {
        files.map_ok(|s| s.len() as u64).boxed()
    }
    
    // GOAL - Design this method so that it can accept the result of file_sizes
    //        and also accept streams of files from app code.
    //
    // Mock method that gets a list of files (usually from some base dir).
    fn list_files(&self) -> BoxStream<Result<String, LibError>> {
        iter(["abc", "xyz", "a"].map(|s| Ok(s.to_owned()))).boxed()
    }
}

// -------------- APPLICATION CODE --------------------

#[derive(Debug)]
struct AppError {
    
}

// In my app I'm not using list_files because I have some custom way
// to get a list of files.  I still want to use the file_sizes method.
fn custom_file_source() -> BoxStream<'static, Result<String, AppError>> {
    // In reality, maybe I'm getting these files from a catalog stored on disk
    iter(["custom", "files"]).map(|s| Ok(s.to_owned())).boxed()
}

// Two approaches to getting the total size.  One using a custom source and
// one using the list_files method from FilesHandler.  I'd like to design the
// FilesHandler::file_sizes method to support both approaches.
async fn get_total_size_from_custom_source() -> Result<(), LibError> {
    let handler = FilesHandler {};
    let my_files = custom_file_source();
    // AWKWARD API
    // I have to wrap my stream from AppError to LibError which is very awkward
    let my_files = my_files.map(|s| s.map_err(|_| LibError{})).boxed();
    let total_size = handler.file_sizes(my_files)
                            .try_fold(0, |acc, x| async move { Ok(acc + x) }).await?;
    println!("The total size of all files (custom source) is {}", total_size);
    Ok(())
}

async fn get_total_size_from_list_method() -> Result<(), LibError> {
    let handler = FilesHandler {};
    let total_size = handler.file_sizes(handler.list_files())
                            .try_fold(0, |acc, x| async move { Ok(acc + x) }).await?;
    println!("The total size of all files (list source) is {}", total_size);
    Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), LibError>{
    get_total_size_from_custom_source().await?;
    get_total_size_from_list_method().await?;
    Ok(())
}

Rust Playground

You could maybe make your code generic on error type, like this (not tested):

fn file_sizes<'a, E>(
    &'a self,
    files: BoxStream<'a, Result<String, E>>,
) -> BoxStream<Result<u64, E>> where LibError: Into<E> {
    files.map(|s| get_file_size(s?)? ).boxed()
}

fn get_file_size(file: String) -> Result<u64, LibError> {
    todo!()
}

User would then need to define impl From<LibError> for AppError as is usual (they would get impl Into<AppError> for LibError for free).

1 Like

I agree with Tom's idea that file_sizes holds a generic Error type, but it doesn't need the trait bound: Rust Playground

    fn file_sizes<'a, E>(
        &'a self,
        files: impl Stream<Item = Result<String, E>>,
    ) -> impl Stream<Item = Result<u64, E>> {  // no trait bound on E
        files.map_ok(|s| s.len() as u64)
    }

Also, BoxStream is needless if you noticed the impl Stream type.[1]


  1. when BoxStream is needed somewhere, use .boxed on impl Stream type like what you're doing now ↩ī¸Ž

1 Like

Thank you both. I agree with the idea of making the error type a generic parameter. I ended up with something slightly different but the same basic idea:

    fn file_sizes<E: std::error::Error>(
        self,
        files: impl Stream<Item = Result<String, E>>,
    ) -> impl Stream<Item = Result<u64, LibError>> {
        files
            .map_err(|err| LibError::from_source_error(&err))
            .try_filter_map(|s| future::ready(Ok(Some(s.len() as u64))))
    }