How to save time when use common data on multi-process?

My code is as below:

async fn process_once(web_data: Vec<f64>) {
    // A(consume 15 seconds): many codes here, including load data from local and their calculations; 

    // B(consume 1 seconds): here use `web_data`

    // C(consume 2 seconds): other calculations;

}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let refer_list = vec![1, 2, 3, 4, 5];

    // here get `web_data` from web;
    let web_data = ...;
    
    let futures = FuturesOrdered::from_iter((0..refer_list.len()).map(|i| {
        zig_once(web_data.clone())
    }));
    futures.try_collect().await?;
    
    Ok(())
}

If I do like above, I save a little time because most time is consumed by Block A in function process_once which could run before received web_data
so how to optimized this codes?

You can wrap step A into a task and await it when you really need its value.

Imitation: process_once now needs18s vs after spawning a task, it needs 15s or 3s

In that case, you should spawn a task that starts before web_date, instead of spawning it after web_date.

1 Like

In step A, you can use the bincode crate to encode all the expensive precomputed data to the local file system and not only load and decode it directly into a data structure in steps B and C in the same process, you can also do so in future invocations of the program.

1 Like

after spawning a task, it needs 15s or 3s
I should use result of function a at function b and I think we can save all the times which would be consumed by '2,3,4,5' in refer_list; and for '1', we still should consume time for running function a.

for first solution, I find only run the first time:

#![allow(unused)]
use futures::stream::{FuturesOrdered, StreamExt};
use std::{time::{Duration, Instant}, thread::sleep, error::Error};

async fn process_once(web_data: Vec<f64>, x: usize) {
    // A(consume 15 seconds): many codes here, including load data from local and their calculations; 
    println!("run_{}", x);
    a().await;
    // B(consume 1 seconds): here use `web_data`
    b().await;
    // C(consume 2 seconds): other calculations;
    c().await;
}

async fn a() {
    sleep(Duration::from_millis(150));
}
async fn b() {
    sleep(Duration::from_millis(10));
}
async fn c() {
    sleep(Duration::from_millis(20));
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let refer_list = vec![1, 2, 3, 4, 5];

    // here get `web_data` from web;
    let web_data = vec![];
    
    let now = Instant::now();
    
    let futures = FuturesOrdered::from_iter((0..refer_list.len()).map(|i| {
        process_once(web_data.clone(), i)
    })).into_future();
    futures.await;
    
    dbg!(now.elapsed().as_millis());
    
    Ok(())
}

only print one run_0.

Don't use StreamExt::into_future here. Awaiting StreamFuture waits for the first element of the stream (always run_0, because we use FuturesOrdered) to become available, return the first element and the rest of the (not yet finished) stream. It does not wait for the whole stream to finish, which is what you are after.

Try using StreamExt::collect to wait until your stream is finished instead:

let futures = FuturesOrdered::from_iter((0..refer_list.len()).map(|i| {
    process_once(web_data.clone(), i)
}));
    
futures.collect::<()>().await;

Playground.

Note that we are using FuturesOrdered here. Ordering the element of your stream is currently unnecessary and will only make your program slower. If you don't need ordering when you want to wait for multiple futures with a stream, use FuturesUnordered instead.

1 Like

Hi @jofas
Thanks for your reply again;

About mutiple process by FuturesOrdered, I am still a little confused about below code:

use futures::{TryFutureExt, TryStreamExt};
use std::error::Error;
use std::{thread, time, vec};

async fn sleep_seconds(seconds: u64) -> Result<(), Box<dyn Error>>{
    thread::sleep(time::Duration::from_millis(seconds));
    Ok(())
}

async fn run_once(
    unique_num: i64,
) -> Result<i64, Box<dyn Error>> {
    println!("lines_{}", unique_num);
    sleep_seconds(10000).await?;
    Ok(unique_num)
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let unique_num_list = vec![1, 2, 3, 4];
    let futures_threads = futures::stream::FuturesOrdered::from_iter((0..unique_num_list.len()).map(|i| {
        let unique_numi = unique_num_list[i];
        run_once(unique_numi)}));
    let preds_list: Vec<i64> = futures_threads.try_collect().await?;
    dbg!(preds_list);

    Ok(())
}

When I run this code and find that it is not multiple process run; for each of [1, 2, 3, 4];
It prints
lines_1 lines_2 lines_3 lines_4 one by one with interval sleep 10 seconds; if it multiple process it should print all of them at once(just like web request await); so how shall I to do that when using std::thread::sleep.

Don't use std::thread::sleep. It will block your tokio worker thread (not the task!), making it unable to perform other tasks (i.e. working on your other futures). Use tokio::time::sleep instead:

async fn sleep_seconds(seconds: u64) -> Result<(), Box<dyn Error>>{
    tokio::time::sleep(time::Duration::from_millis(seconds)).await;
    Ok(())
}

Read more on blocking async tasks in this much cited blog post (it actually describes exactly your problem with using std's sleep functionality) :slightly_smiling_face:.

1 Like

thank you.

1 Like

I pick .into_future() since you didn't give the full runable code. And stuff like into_future or collect or FuturesOrdered etc outside of process_once is not very relevant to the question that says how to save time of running process_once function.

Also, whether the other parts of your code is correct is unclear because there is no much information.

If you want to save the whole time including runing process_once multiple times, you can refactor to run in parallel. Then it takes 180ms instead of 5*180ms (in concurrency but no parallel, like via async) to finish process_once 5 times.
Rust Playground

2 Likes

Actually OP didn't use them. I wrote them in the example code, since no full runnable code was provided.

Your suggestions are right, and I've already knew them too. I use

  • into_future:to show how long the function process_once will take to run for one time
  • std::thread::sleep: to imitate the A/B/C steps described by OP

The best way to reduce anyone's time in communicating is simply ask OP to provide a runnable/reproducible/minimum working code beforehand.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.