Idiomatic way to fold "empty" Results asynchronously

Hi,

Is there a better (more idiomatic) way to reduce "empty" Results concurrently?

#![feature(async_fn_in_trait)]

use futures::{StreamExt, TryStreamExt};

struct Value;

trait Apply {
    async fn apply(&self, value: &Value) -> Result<(), ()>;
}

struct Node;

impl Apply for Node {
    async fn apply(&self, _: &Value) -> Result<(), ()> {
        unimplemented!("no matter what is here")
    }
}

struct Cluster {
    nodes: Vec<Node>,
}

impl Apply for Cluster {
    async fn apply(&self, value: &Value) -> Result<(), ()> {
        futures::stream::iter(&self.nodes)
            .map(move |node| node.apply(value))
            .buffer_unordered(self.nodes.len())
            .try_fold((), |_, _| async move { Ok(()) }) // how to rid of this line?
            .await
    }
}

Compiles without any problems on (Playground)

I just don't like .try_fold((), |_, _| async move { Ok(()) }). Who can propose anything better?

The goal is to ensure that all is Ok(()).

P.S. This time nightly build instead of async_trate crate. ))

You can use .try_collect() instead of that .try_fold(…) call. The () type actually supports being collected from a bunch of () items, both for normal iterators via impl FromIterator<()> for (), as well as for streams like this, too, via the implementation of impl Default for () and impl Extend<()> for ().

2 Likes
futures::stream::iter(&self.nodes)
    .map(Ok)
    .try_for_each_concurrent(None, |node| node.apply(value))
    .await

Edit: added .map(Ok) to make the stream TryStream

2 Likes

Cannot compile by some reason: (Playground)

Besides missing the use futures::TryStreamExt; in this playground, being a TryStreamExt method, it needs the stream we’re starting with to be a stream of Results already… so something like

futures::stream::iter(&self.nodes)
    .map(Ok)
    .try_for_each_concurrent(None, |node| node.apply(value))
    .await

works, apparently.

2 Likes

Lots of these methods, like buffer_unordered or try_for_each_concurrent, use FuturesUnordered internally. Since they all have extra code reasoning about potential length limits and such, if you don’t want any of this, the most conceptionally straightforward way might be to just to skip all these convenience layers and use the FuturesUnordered primitive yourself.

Not necessarily the shortest way in code, but it’s still fairly straightforward. A FuturesUnordered can be constructed from an iterator of futures, and then it itself is a stream, so the .try_collect of Result<(), _>s trick applies, giving something like

self.nodes
    .iter()
    .map(|node| node.apply(value))
    .collect::<FuturesUnordered<_>>()
    .try_collect()
    .await

Or… if the two collect-like methods in a row seem too confusing, how about

FuturesUnordered::from_iter(self.nodes.iter().map(|node| node.apply(value)))
    .try_collect()
    .await

I’m not saying this is necessary or better than any previous approach, but I believe at least learning about the existence of FuturesUnordered (and FuturesOrdered, too, if you want to look at that) can be educational :slight_smile:

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.