Understanding how to pass a struct impl async method as a parameter to another async function

I am trying to implement a throttle function for some async tasks using tokio::semaphore and the simplified reproduction is attached. I would like to pass a Vec<Task::run> to throttled_execution, but since Task::run is a member, the impl FnMut is not accepting a function pointer. (FnMut is a must since in the real scenario Task::run will modify Task's state.) I checked this forum, and found the following threads might be relevant:

Async methods as parameters to another async method
How to pass a async function as a parameter to another function?

yet I am passing a Vec of the async member methods, so I am not sure how to Box the function in the map call of TaskList::run_all. Can I have some help here?

async fn throttled_execution<F, Fut>(
    future_funcs: Vec<impl FnMut() -> Fut + Send + 'static>,
    num_tasks_concurrent: usize,
) -> Result<Vec<Fut::Output>>
where
    Fut: Future + Send + 'static,
    Fut::Output: Send + 'static,
{
    let semaphore = Arc::new(Semaphore::new(num_tasks_concurrent));

    let mut futures = vec![];
    for mut future_func in future_funcs.into_iter() {
        let semaphore_clone = semaphore.clone();

        futures.push(tokio::spawn(async move {
            let permit = semaphore_clone.acquire().await.unwrap();

            let result = future_func().await;

            drop(permit);

            result
        }));
    }

    let mut result = vec![];
    for future in futures {
        let item = future.await?;
        result.push(item);
    }

    Ok(result)
}

struct Task(usize);

impl Task {
    pub fn new(id: usize) -> Self {
        Self(id)
    }

    async fn run(&self) -> Result<()> {
        println!("Start task {}", self.0);
        tokio::time::sleep(Duration::new(1, 0)).await;
        println!("Finish task {}", self.0);

        Ok(())
    }
}

struct TaskList {
    tasks: Vec<Task>,
    max_parallel: usize,
}

impl TaskList {
    pub fn new(num_tasks: usize, max_parallel: usize) -> Self {
        let mut result = TaskList {
            tasks: vec![],
            max_parallel,
        };

        for i in 0..num_tasks {
            result.tasks.push(Task::new(i));
        }

        result
    }
    pub async fn run_all(&self) -> () {
        throttled_execution(
            self.tasks
                .iter()
                .map(|i| i.run)
                .collect_vec(),
            self.max_parallel,
        )
        .await;

        ()
    }
}

Your code has many other problems in addition to misunderstanding method notation. There's no syntax for automatic partial application; accessing a method without calling it is invalid. If you want to bind to a specific object, capture it in a closure.

But again, other errors include:

  • dubious use of generic parameters
  • lack of error handling
  • incorrect ownership structure.

Here's the code that fixes all of these.

1 Like

Regarding to this part of the modified code:

    pub async fn run_all(self) -> Result<(), JoinError> {
        throttled_execution(
            self.tasks
                .into_iter()
                .map(|i| move || async { i.run().await })
                .collect(),
            self.max_parallel,
        )
        .await
        .map(drop)
    }

the run_all would consume self and every Task in self.tasks. I am wondering if there is a way I can do multiple run_all, or multiple throttled_execution in run_all?

Task's run also consumes self, so no. What you can do instead is take each task out of tasks, so that the Vec is empty after run_all. But I don't know whether this makes sense for your problem.

    pub async fn run_all(&mut self) -> Result<(), JoinError> {
        let tasks = mem::take(&mut self.tasks);
        throttled_execution(
                tasks
                .into_iter()
                .map(|i| move || async { i.run().await })
                .collect(),
            self.max_parallel,
        )
        .await
        .map(drop)
    }
1 Like

Since Task::run(&self) only holds a reference of self and will not consume itself, is there a way letting tasks not use into_iter, but iter or iter_mut?

If Task::run takes &self you can use &self in run_all and use iter(). But then the semantics are different: All Task will run everytime you call run_all.

I find it relatively difficult to do so. The lifetime of Task is hard to define. I changed into_iter to iter and see the following error:

lifetime may not live long enough
closure implements `Fn`, so references to captured variables can't escape the closurerustcClick for full compiler diagnostic

Well, the root of the problem is that tokios spawn requires the tasks to be 'static. Read here for more information and possible solutions:

If you want sharing in a concurrent context, you should probably use shared ownership (i.e., Arc), not references.

Okay, I think I figured out the solution. The major point is that the run should not take &mut self but Arc<Mutex<Self>> so the lifetime of Self could be extended. Here I attach the runnable code.

use anyhow::Result;
use std::{future::Future, sync::Arc, time::Duration};
use tokio::sync::{Mutex, Semaphore};

async fn throttled_execution<Fut>(
    future_funcs: Vec<impl FnOnce() -> Fut + Send + Sync + 'static>,
    num_tasks_concurrent: usize,
) -> Result<Vec<Fut::Output>>
where
    Fut: Future + Send + 'static,
    Fut::Output: Send + 'static,
{
    let semaphore = Arc::new(Semaphore::new(num_tasks_concurrent));

    let mut futures = vec![];
    for future_func_ref in future_funcs.into_iter() {
        let semaphore_clone = semaphore.clone();

        futures.push(tokio::spawn(async move {
            let permit = semaphore_clone.acquire().await.unwrap();

            let result = future_func_ref().await;

            drop(permit);

            result
        }));
    }

    let mut result = vec![];
    for future in futures {
        let item = future.await?;
        result.push(item);
    }

    Ok(result)
}

struct Task(usize, usize);

impl Task {
    pub fn new(id: usize) -> Self {
        Self(id, 0)
    }

    async fn run(task: Arc<Mutex<Self>>) -> Result<usize> {
        let mut task = task.lock().await;

        println!("Start task {}: Round {}", task.0, task.1);
        tokio::time::sleep(Duration::new(1, 0)).await;
        println!("Finish task {}: Round {}", task.0, task.1);

        task.1 += 1;

        Ok(task.0)
    }
}

struct TaskList {
    tasks: Vec<Arc<Mutex<Task>>>,
    max_parallel: usize,
}

impl TaskList {
    pub fn new(num_tasks: usize, max_parallel: usize) -> Self {
        let mut result = TaskList {
            tasks: vec![],
            max_parallel,
        };

        for i in 0..num_tasks {
            result.tasks.push(Arc::new(Mutex::new(Task::new(i))));
        }

        result
    }
    pub async fn run_all(&self) -> Result<()> {
        for _ in 0..3 {
            let mut tasks = vec![];
            for task in self.tasks.iter() {
                let task_clone = Arc::clone(task);
                tasks.push(move || async { Task::run(task_clone).await });
            }

            throttled_execution(tasks, self.max_parallel).await?;
        }

        Ok(())
    }
}

#[tokio::main]
async fn main() {
    TaskList::new(32, 8).run_all().await.unwrap()
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.