How to fold multiple async results concurrently?

Hi,

I am trying to implement a computational cluster in which some part of computations can be done in the mapreduce manner. However, now I am struggling to find out an idiomatic way to fold the computational results from multiple nodes into a single result.

Below a simple example of logic I'm trying to achieve:

use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};

#[async_trait]
trait Sum {
    async fn sum(&self) -> Result<f64, ()>;
}

struct Node;

#[async_trait]
impl Sum for Node {
    async fn sum(&self) -> Result<f64, ()> {
        unimplemented!("no matter how we get this value")
    }
}

struct Cluster {
    nodes: Vec<Node>,
}

#[async_trait]
impl Sum for Cluster {
    async fn sum(&self) -> Result<f64, ()> {
        futures::stream::iter(&self.nodes)
            .map(|node| node.sum())
            .buffer_unordered(self.nodes.len())
            .try_fold(0.0, |acc, v| async move { Ok(acc + v) })
            .await
    }
}

(Playground)

I tried multiple approaches, but cannot find the one which fits 2 criteria: looks good & can be compiled. Compile errors from the solution above:

   Compiling playground v0.0.1 (/playground)
error: higher-ranked lifetime error
  --> src/lib.rs:24:44
   |
24 |       async fn sum(&self) -> Result<f64, ()> {
   |  ____________________________________________^
25 | |         futures::stream::iter(&self.nodes)
26 | |             .map(|node| node.sum())
27 | |             .buffer_unordered(self.nodes.len())
28 | |             .try_fold(0.0, |acc, v| async move { Ok(acc + v) })
29 | |             .await
30 | |     }
   | |_____^
   |
   = note: could not prove `Pin<Box<impl futures::Future<Output = Result<f64, ()>>>>: CoerceUnsized<Pin<Box<(dyn futures::Future<Output = Result<f64, ()>> + std::marker::Send + 'd)>>>`

error: could not compile `playground` due to previous error

The problem I'm trying to solve is trivial for languages like Java/Scala, but here, with Rust, I'm completely stuck, probably due to lack of experience.

I can't help you with the error message you're getting as I've never seen it before and don't usually work with async_traits which seems to me to be the origin of the error message. I can make your example compile, however, with code that I think is readable:

use async_trait::async_trait;
use futures::future::join_all;

#[async_trait]
trait Sum {
    async fn sum(&self) -> Result<f64, ()>;
}

struct Node;

#[async_trait]
impl Sum for Node {
    async fn sum(&self) -> Result<f64, ()> {
        unimplemented!("no matter how we will get this value")
    }
}

struct Cluster {
    nodes: Vec<Node>,
}

#[async_trait]
impl Sum for Cluster {
    async fn sum(&self) -> Result<f64, ()> {
        join_all(self.nodes.iter().map(|n| n.sum()))
            .await
            .into_iter()
            .try_fold(0.0, |acc, s| Ok(acc + s?))
    }
}

Instead of using streams, I use join_all to wait for each future to complete. The results are gathered into a Vec<Result<f64, ()>>, on which folding is trivially done synchronously.

Edit: Playground

You can also go the by-value route: Playground.

#[async_trait]
trait Sum {
    async fn sum(self) -> Result<f64, ()>;
}

struct Node;

#[async_trait]
impl Sum for Node {
    async fn sum(self) -> Result<f64, ()> {
        unimplemented!("no matter how we will get this value")
    }
}

struct Cluster {
    nodes: Vec<Node>,
}

#[async_trait]
impl Sum for Cluster {
    async fn sum(self) -> Result<f64, ()> {
        let len = self.nodes.len();
        futures::stream::iter(self.nodes)
            .map(|node| node.sum())
            .buffer_unordered(len)
            .try_fold(0.0, |acc, item| async move { Ok(acc + item) })
            .await
    }
}
1 Like

From your snippet I assume the .buffer_unordered(self.nodes.len()) is causing the error message, which can be mitigated with this

binding?

That error looks awful and terrifying.

One thing I would note is that while you can use async runtimes for computational work, they're not optimized for it, rather they're designed to keep up CPU occupancy and reduce thread thrashing when using io.

For computational parallelism, the standard library is rayon, which makes these sorts of examples trivial.

3 Likes

Thank you. Such solutions make my Rust programming experience absolutely incomparable to other programming languages. ))

No, that doesn't work, it doesn't change anything. The lifetime solver has a problem with the future doing the buffering because it has a reference to self.

1 Like

Well, Rust is all about values. If you have a borrowing error, then an obvious solution might be not to borrow in the first place. It's often easier to work with values rather than references while you are a beginner, because there is no indirection to worry about.

That said, I have no idea how to make the borrowing version compile. I tried the by-value version because I suspected it would work. But I'm not willing to find out exactly what the desugaring of async_trait is, and without that, I don't immediately know what's wrong with the borrowing version, either.

I'm familiar with the rayon and use it too, however they have different purpose. Rayon cannot replace asynchronous programming for I/O workload, when you are waiting for a computation from another device.

Sure! Just wanted to make sure you're not taking your hair out trying to remake a worse version of it!

And is also a compiler bug. Similar to issues like this one

If you expand the macro, annd make it no longer be a trait impl, you get

impl Cluster {
    fn sum<'life0, 'async_trait>(
        &'life0 self,
    ) -> ::core::pin::Pin<
        Box<
            dyn ::core::future::Future<Output = Result<f64, ()>>
                + ::core::marker::Send
                + 'async_trait,
        >,
    >
    where
        'life0: 'async_trait,
        Self: 'async_trait,
    {
        Box::pin(async move {
            if let ::core::option::Option::Some(__ret) =
                ::core::option::Option::None::<Result<f64, ()>>
            {
                return __ret;
            }
            let __self = self;
            let __ret: Result<f64, ()> = {
                futures::stream::iter(&__self.nodes)
                    .map(|node| node.sum())
                    .buffer_unordered(__self.nodes.len())
                    .try_fold(0.0, |acc, v| async move { Ok(acc + v) })
                    .await
            };

            #[allow(unreachable_code)]
            __ret
        })
    }
}

which can then be made to compile by removing the + ::core::marker::Send bound[1]. What? Makes no sense at all, right? The error mainly said something about “lifetime”, not “Send”. Either there is no reason why Send should be problematic here in the first place, in which case the code should compile, or in case there is a problem, the compiler would need to point out the offending type that precludes the Send bound, and where in the code it comes from.


  1. which is why I made it no longer a trait impl, so we can change the signature ↩︎

2 Likes

I should have known. It's always Send.