async fn process_once(web_data: Vec<f64>) {
// A(consume 15 seconds): many codes here, including load data from local and their calculations;
// B(consume 1 seconds): here use `web_data`
// C(consume 2 seconds): other calculations;
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let refer_list = vec![1, 2, 3, 4, 5];
// here get `web_data` from web;
let web_data = ...;
let futures = FuturesOrdered::from_iter((0..refer_list.len()).map(|i| {
zig_once(web_data.clone())
}));
futures.try_collect().await?;
Ok(())
}
If I do like above, I save a little time because most time is consumed by Block A in function process_once which could run before received web_data;
so how to optimized this codes?
In step A, you can use the bincode crate to encode all the expensive precomputed data to the local file system and not only load and decode it directly into a data structure in steps B and C in the same process, you can also do so in future invocations of the program.
after spawning a task, it needs 15s or 3s
I should use result of function a at function b and I think we can save all the times which would be consumed by '2,3,4,5' in refer_list; and for '1', we still should consume time for running function a.
Don't use StreamExt::into_future here. Awaiting StreamFuture waits for the first element of the stream (always run_0, because we use FuturesOrdered) to become available, return the first element and the rest of the (not yet finished) stream. It does not wait for the whole stream to finish, which is what you are after.
Try using StreamExt::collect to wait until your stream is finished instead:
let futures = FuturesOrdered::from_iter((0..refer_list.len()).map(|i| {
process_once(web_data.clone(), i)
}));
futures.collect::<()>().await;
Note that we are using FuturesOrdered here. Ordering the element of your stream is currently unnecessary and will only make your program slower. If you don't need ordering when you want to wait for multiple futures with a stream, use FuturesUnordered instead.
About mutiple process by FuturesOrdered, I am still a little confused about below code:
use futures::{TryFutureExt, TryStreamExt};
use std::error::Error;
use std::{thread, time, vec};
async fn sleep_seconds(seconds: u64) -> Result<(), Box<dyn Error>>{
thread::sleep(time::Duration::from_millis(seconds));
Ok(())
}
async fn run_once(
unique_num: i64,
) -> Result<i64, Box<dyn Error>> {
println!("lines_{}", unique_num);
sleep_seconds(10000).await?;
Ok(unique_num)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let unique_num_list = vec![1, 2, 3, 4];
let futures_threads = futures::stream::FuturesOrdered::from_iter((0..unique_num_list.len()).map(|i| {
let unique_numi = unique_num_list[i];
run_once(unique_numi)}));
let preds_list: Vec<i64> = futures_threads.try_collect().await?;
dbg!(preds_list);
Ok(())
}
When I run this code and find that it is not multiple process run; for each of [1, 2, 3, 4];
It prints lines_1 lines_2 lines_3 lines_4 one by one with interval sleep 10 seconds; if it multiple process it should print all of them at once(just like web request await); so how shall I to do that when using std::thread::sleep.
Don't use std::thread::sleep. It will block your tokio worker thread (not the task!), making it unable to perform other tasks (i.e. working on your other futures). Use tokio::time::sleep instead:
I pick .into_future() since you didn't give the full runable code. And stuff like into_future or collect or FuturesOrdered etc outside of process_once is not very relevant to the question that says how to save time of running process_once function.
Also, whether the other parts of your code is correct is unclear because there is no much information.
If you want to save the whole time including runing process_once multiple times, you can refactor to run in parallel. Then it takes 180ms instead of 5*180ms (in concurrency but no parallel, like via async) to finish process_once 5 times. Rust Playground