Trait for Stream needs buffer_unordered to make it work

Hello,

Could you please tell me why I need to add .buffer_unordered(n) to make this work? My understanding is that map is also returning Stream...

Also, is there a best practice for Stream in a trait? If I worked with Iterator, I would implement it for IntoIter instead of Iterator, but IntoStream does not exist...

Thank you.

use futures::stream;
use futures::stream::StreamExt;
trait FooBar {
    async fn do_something(self) -> Result<Vec<String>, String>;
}
impl<T> FooBar for T
where
    T: Stream<Item = Result<String, String>> + std::marker::Unpin,
{
    async fn do_something(mut self) -> Result<Vec<String>, String> {
        let mut out = Vec::new();
        // just a simple example
        while let Some(i) = self.next().await {
            out.push(i?);
        }
        Ok(out)
    }
}
async fn abcdef() -> Result<Vec<String>, String> {
    let out: Vec<Result<String, String>> = vec![Ok("a".to_owned()), Ok("b".to_owned())];

    // works
    // stream::iter(out.map(|inp| async { inp }).buffer_unordered(8).do_something().await

    // does not work
    stream::iter(out)
        .map(|inp|
        // just a simple example
        async { inp })
        .do_something()
        .await
}

stream::iter(out).map(|inp| async { inp }) is a stream of Future values, meaning that they haven't been awaited yet and thus haven't been evaluated. Adding buffer_unordered() gives you a stream of the results of evaluating those futures. A simpler alternative would be to use then() instead of map().

2 Likes