How to rewrite my code for functional programming?

initial code:

let mut res_bool = Vec::new();
let mut tasks = Vec::new();
for node in nodes.iter()
{
    tasks.push(tokio::spawn(check_node(node.link.clone(),node.title.clone())));
}
for task in tasks{
    // let res = task.await?.unwrap();
    res_bool.push(task.await?.unwrap());
}

and I want to try rewrite it to functional programming:

let tasks = nodes.iter()
.map(|node| {tokio::spawn(check_node(node.link.clone(),node.title.clone()))} )
.collect::<Vec<_>>();

let res_bool = tasks
.into_iter()
.map( |f| async{f.await.unwrap()})
.collect::<Vec<_>>();

But failed....
How can I fix it ? Thanks a lot !

How about using FututresOrdered to get your res_bool?

use futures::stream::{FuturesOrdered, TryStreamExt};
use tokio::task::JoinError;

let tasks = nodes
    .iter()
    .map(|node| tokio::spawn(check_node(node.link.clone(), node.title.clone())))
    .collect::<Vec<_>>();

let res_bool: Result<Vec<bool>, JoinError> =
    FuturesOrdered::from_iter(tasks).try_collect().await;

Playground.

I would recommend against FuturesUnordered because of a well-known footgun: Footgun lurking in `FuturesUnordered` and other concurrency-enabling streams · Issue #2387 · rust-lang/futures-rs · GitHub
It was turned into a status quo story: Barbara battles buffered streams - wg-async
Also, here is relevant blog post from notgull (smol maintainer): https://notgull.net/futures-concurrency-in-smol/

In this case, to achieve something identical to the initial code, you could turn the iterator into a stream, and then use the stream combinators from futures::StreamExt instead of iterator combinators.
See StreamExt::then.

2 Likes

Oops, accidentally linked FuturesUnordered instead of FuturesOrdered. Fixed the link. Then again, FuturesOrdered also concurrently polls the stream AFAIK, so the same footgun applies.

1 Like

This is accurate. Just to be sure, I adapted the demo found in the issue to use FuturesOrdered, and indeed observed the same undesirable behavior: Rust Playground

2 Likes

Just going by the first link, I don't believe the footgun applies in this case, as golovo's code and jofas's proposed fix both use tokio::spawn(), with the FuturesOrdered being a stream of JoinHandles. Thus, the check_node() tasks are constantly being polled by the tokio executor, and the FuturesOrdered only polls JoinHandles, which should suffer no negative consequences (aside from excess memory usage) from long delays between polls.

Indeed, @golovo’s code would not be affected. I should have made it more obvious in my initial post, but my intention was solely to recommend against the usage of FuturesUnordered / FuturesOrdered without implying the suggested fix was broken (as-is, it’s not). The reason for me to do so is that even if it’s possible to use these correctly, it’s notoriously error-prone, and since tokio::spawn is already used anyway, there is no benefit in using FuturesOrdered over StreamExt::then. (But even if the code didn’t use tokio::spawn, I would suggest to replace FuturesOrdered in its favor.)

To help my point, here is a version using StreamExt::then (I adapted @jofas code):

    let tasks = nodes
        .iter()
        .map(|node| tokio::spawn(check_node(node.link.clone(), node.title.clone())))
        .collect::<Vec<_>>();

    let res_bool: Result<Vec<bool>, JoinError> = futures::stream::iter(tasks)
        .then(|handle| async { handle.await })
        .try_collect()
        .await;

The other interesting benefit in this case is that it’s actually equivalent to the initial code that golovo is attempting to rewrite (modulo error handling).

Thanks for your help. :grin:
...Finally, I think the initial code is better for readable.

1 Like

Thanks for your help! :grin:

My final code :

async fn check_nodes(nodes: &[ExcelNode]) -> anyhow::Result<Vec<bool>> {
    let tasks= nodes
    .iter()
    .map(|node| tokio::spawn(check_node(node.link.clone(), node.title.clone())))
    .collect::<Vec<_>>();

    future::join_all(tasks)
    .await
    .into_iter()
    .flatten()
    .collect()
}