I am trying to implement a computational cluster in which some part of computations can be done in the mapreduce manner. However, now I am struggling to find out an idiomatic way to fold the computational results from multiple nodes into a single result.
Below a simple example of logic I'm trying to achieve:
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
#[async_trait]
trait Sum {
async fn sum(&self) -> Result<f64, ()>;
}
struct Node;
#[async_trait]
impl Sum for Node {
async fn sum(&self) -> Result<f64, ()> {
unimplemented!("no matter how we get this value")
}
}
struct Cluster {
nodes: Vec<Node>,
}
#[async_trait]
impl Sum for Cluster {
async fn sum(&self) -> Result<f64, ()> {
futures::stream::iter(&self.nodes)
.map(|node| node.sum())
.buffer_unordered(self.nodes.len())
.try_fold(0.0, |acc, v| async move { Ok(acc + v) })
.await
}
}
The problem I'm trying to solve is trivial for languages like Java/Scala, but here, with Rust, I'm completely stuck, probably due to lack of experience.
I can't help you with the error message you're getting as I've never seen it before and don't usually work with async_traits which seems to me to be the origin of the error message. I can make your example compile, however, with code that I think is readable:
use async_trait::async_trait;
use futures::future::join_all;
#[async_trait]
trait Sum {
async fn sum(&self) -> Result<f64, ()>;
}
struct Node;
#[async_trait]
impl Sum for Node {
async fn sum(&self) -> Result<f64, ()> {
unimplemented!("no matter how we will get this value")
}
}
struct Cluster {
nodes: Vec<Node>,
}
#[async_trait]
impl Sum for Cluster {
async fn sum(&self) -> Result<f64, ()> {
join_all(self.nodes.iter().map(|n| n.sum()))
.await
.into_iter()
.try_fold(0.0, |acc, s| Ok(acc + s?))
}
}
Instead of using streams, I use join_all to wait for each future to complete. The results are gathered into a Vec<Result<f64, ()>>, on which folding is trivially done synchronously.
One thing I would note is that while you can use async runtimes for computational work, they're not optimized for it, rather they're designed to keep up CPU occupancy and reduce thread thrashing when using io.
For computational parallelism, the standard library is rayon, which makes these sorts of examples trivial.
No, that doesn't work, it doesn't change anything. The lifetime solver has a problem with the future doing the buffering because it has a reference to self.
Well, Rust is all about values. If you have a borrowing error, then an obvious solution might be not to borrow in the first place. It's often easier to work with values rather than references while you are a beginner, because there is no indirection to worry about.
That said, I have no idea how to make the borrowing version compile. I tried the by-value version because I suspected it would work. But I'm not willing to find out exactly what the desugaring of async_trait is, and without that, I don't immediately know what's wrong with the borrowing version, either.
I'm familiar with the rayon and use it too, however they have different purpose. Rayon cannot replace asynchronous programming for I/O workload, when you are waiting for a computation from another device.
which can then be made to compile by removing the+ ::core::marker::Send bound[1]. What? Makes no sense at all, right? The error mainly said something about “lifetime”, not “Send”. Either there is no reason why Send should be problematic here in the first place, in which case the code should compile, or in case there is a problem, the compiler would need to point out the offending type that precludes the Send bound, and where in the code it comes from.
which is why I made it no longer a trait impl, so we can change the signature ↩︎