I am attempting to learn how to run multiple tasks in parallel. I am using Tokio to do so. I am able to spawn tasks and obtain the return values within the associated closure. But I am unable to determine how to access these values outside the closure. The goal is to be able to assign the 3 vectors returned from the 3 tasks to state variables and then change the screen.
Any assistance will be appreciated. Below is the code which is in the update function of an iced program.
Message::DatabaseUpdated(rec_read, rec_updated, rec_inserted) => {
println!("Entered DatabaseUpdated message handler...");
self.records_read = rec_read;
self.records_updated = rec_updated;
self.records_inserted = rec_inserted;
let start_date: String = get_start_date(self.time_interval).unwrap();
let task1 = async {
get_all_close(start_date)
};
let task2 = async {
get_close(QueryType::Min)
};
let task3 = async{
get_close(QueryType::Max)
};
println!("Beginning tokio spawn for async tasks...");
tokio::spawn(async {
let handle1 = tokio::spawn(task1);
let handle2 = tokio::spawn(task2);
let handle3 = tokio::spawn(task3);
let results = tokio::join!(handle1, handle2, handle3);
// let results: (
// Result<impl Future<Output = Result<Vec<QueryResults>, IoSqlError», JoinError>,
// Result<impl Future<Output = Result<Vec<QueryResults>, IoSqlError»>, JoinError>,
// Result<impl Future<Output = Result<Vec<QueryResults>, IoSqlError>>, JoinError>)
let selected_values = results.0.unwrap().await.unwrap();
let min_values = results.1.unwrap().await.unwrap();
let max_values = results.2.unwrap().await.unwrap();
println!("Selected values count : {}", selected_values.len());
println!("Min values count : {}", min_values.len());
println!("Max values count : {}", max_values.len());
println!("Async tasks completed. ...................");
});
println!("Async tasks spawned. Waiting for completion...");
println!("Leaving DatabaseUpdated message handler...");
Task::none()
} // end of Message::DatabaseUpdated
you should use the returned Task to wrap the async operation, instead of returning none.
the iced runtime will schedule the task and dispatch the result to your application when the task is finished. which means you should modify the Message type to carry the asynchronous data.
Great suggestion. I updated my code and it works. In the function get_closing_values I was not able to figure out how to run the three async function calls in parallel. Below is the code.
you can spawn() them as tokio tasks just like you did before, it's just now inside the get_closing_values() function, which is wrapped in a iced::Task.
alternatively, you can use the join combinator which polls the futures concurrently (multiplexed, not in parallel, but for io bound futures, it doesn't matter much).
some comments on the usage of iced::Task:
you can use Task::future() with an async block, you don't need Task::perform() to handle the result with a closure.
personally, I don't use Task::perform() with a closure as the callback, I only use Task::perform() if I have a named function (most common case is a variant of the Message enum) to map the result.
you don't need to wrap a single Task into another Task with Task::batch().
you can utilize Task::batch() for concurrency, in which case you don't need to spawn() them yourself, you just wrap individual futures as iced::Task first, then batch them up as a single iced::Task.
but due to the API limitation, you'll need to split the Message::UpdateStateVectors into smaller units, each carries a single result data.
one possible way to write the code in my personal style could look like this: (I inlined the get_closing_values() function in this example)
Message::DatabaseUpdated(rec_read, rec_updated, rec_inserted) => {
println!("Entered DatabaseUpdated message handler...");
self.records_read = rec_read;
self.records_updated = rec_updated;
self.records_inserted = rec_inserted;
let start_date: String = get_start_date(self.time_interval).unwrap();
let task = Task::future(async move {
use futures::TryFutureExt;
let (max, selected, min) = tokio::join!(
get_close(QueryType::Max)
.inspect_ok(|_values| println!("one is done..."))
.unwrap_or_else(|_error| Vec::new()),
get_all_close(start_data)
.inspect_ok(|_values| println("two is done..."))
.unwrap_or_else(|_error| Vec::new()),
get_close(QueryType::Min)
.inspect_ok(|_values| println("three is done..."))
.unwrap_or_else(|_error| Vec::new()),
);
Message::UpdateStateVectors(selected, min, max)
});
println!("Leaving DatabaseUpdated message handler...");
task
} // end of Message::DatabaseUpdated