I am trying to implement a throttle function for some async tasks using tokio::semaphore and the simplified reproduction is attached. I would like to pass a Vec<Task::run> to throttled_execution, but since Task::run is a member, the impl FnMut is not accepting a function pointer. (FnMut is a must since in the real scenario Task::run will modify Task's state.) I checked this forum, and found the following threads might be relevant:
yet I am passing a Vec of the async member methods, so I am not sure how to Box the function in the map call of TaskList::run_all. Can I have some help here?
async fn throttled_execution<F, Fut>(
future_funcs: Vec<impl FnMut() -> Fut + Send + 'static>,
num_tasks_concurrent: usize,
) -> Result<Vec<Fut::Output>>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let semaphore = Arc::new(Semaphore::new(num_tasks_concurrent));
let mut futures = vec![];
for mut future_func in future_funcs.into_iter() {
let semaphore_clone = semaphore.clone();
futures.push(tokio::spawn(async move {
let permit = semaphore_clone.acquire().await.unwrap();
let result = future_func().await;
drop(permit);
result
}));
}
let mut result = vec![];
for future in futures {
let item = future.await?;
result.push(item);
}
Ok(result)
}
struct Task(usize);
impl Task {
pub fn new(id: usize) -> Self {
Self(id)
}
async fn run(&self) -> Result<()> {
println!("Start task {}", self.0);
tokio::time::sleep(Duration::new(1, 0)).await;
println!("Finish task {}", self.0);
Ok(())
}
}
struct TaskList {
tasks: Vec<Task>,
max_parallel: usize,
}
impl TaskList {
pub fn new(num_tasks: usize, max_parallel: usize) -> Self {
let mut result = TaskList {
tasks: vec![],
max_parallel,
};
for i in 0..num_tasks {
result.tasks.push(Task::new(i));
}
result
}
pub async fn run_all(&self) -> () {
throttled_execution(
self.tasks
.iter()
.map(|i| i.run)
.collect_vec(),
self.max_parallel,
)
.await;
()
}
}
Your code has many other problems in addition to misunderstanding method notation. There's no syntax for automatic partial application; accessing a method without calling it is invalid. If you want to bind to a specific object, capture it in a closure.
the run_all would consume self and every Task in self.tasks. I am wondering if there is a way I can do multiple run_all, or multiple throttled_execution in run_all?
Task's run also consumes self, so no. What you can do instead is take each task out of tasks, so that the Vec is empty after run_all. But I don't know whether this makes sense for your problem.
Since Task::run(&self) only holds a reference of self and will not consume itself, is there a way letting tasks not use into_iter, but iter or iter_mut?
If Task::run takes &self you can use &self in run_all and use iter(). But then the semantics are different: All Task will run everytime you call run_all.
I find it relatively difficult to do so. The lifetime of Task is hard to define. I changed into_iter to iter and see the following error:
lifetime may not live long enough
closure implements `Fn`, so references to captured variables can't escape the closurerustcClick for full compiler diagnostic
Okay, I think I figured out the solution. The major point is that the run should not take &mut self but Arc<Mutex<Self>> so the lifetime of Self could be extended. Here I attach the runnable code.
use anyhow::Result;
use std::{future::Future, sync::Arc, time::Duration};
use tokio::sync::{Mutex, Semaphore};
async fn throttled_execution<Fut>(
future_funcs: Vec<impl FnOnce() -> Fut + Send + Sync + 'static>,
num_tasks_concurrent: usize,
) -> Result<Vec<Fut::Output>>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let semaphore = Arc::new(Semaphore::new(num_tasks_concurrent));
let mut futures = vec![];
for future_func_ref in future_funcs.into_iter() {
let semaphore_clone = semaphore.clone();
futures.push(tokio::spawn(async move {
let permit = semaphore_clone.acquire().await.unwrap();
let result = future_func_ref().await;
drop(permit);
result
}));
}
let mut result = vec![];
for future in futures {
let item = future.await?;
result.push(item);
}
Ok(result)
}
struct Task(usize, usize);
impl Task {
pub fn new(id: usize) -> Self {
Self(id, 0)
}
async fn run(task: Arc<Mutex<Self>>) -> Result<usize> {
let mut task = task.lock().await;
println!("Start task {}: Round {}", task.0, task.1);
tokio::time::sleep(Duration::new(1, 0)).await;
println!("Finish task {}: Round {}", task.0, task.1);
task.1 += 1;
Ok(task.0)
}
}
struct TaskList {
tasks: Vec<Arc<Mutex<Task>>>,
max_parallel: usize,
}
impl TaskList {
pub fn new(num_tasks: usize, max_parallel: usize) -> Self {
let mut result = TaskList {
tasks: vec![],
max_parallel,
};
for i in 0..num_tasks {
result.tasks.push(Arc::new(Mutex::new(Task::new(i))));
}
result
}
pub async fn run_all(&self) -> Result<()> {
for _ in 0..3 {
let mut tasks = vec![];
for task in self.tasks.iter() {
let task_clone = Arc::clone(task);
tasks.push(move || async { Task::run(task_clone).await });
}
throttled_execution(tasks, self.max_parallel).await?;
}
Ok(())
}
}
#[tokio::main]
async fn main() {
TaskList::new(32, 8).run_all().await.unwrap()
}