Struct with async methods and tokio ownership

I'd like to implement an async_trait and use it like this:

#[async_trait]
pub trait Runner {
    async fn execute(&self, id: usize, &self, task: Task) -> Result<TaskAttempt>;
    async fn kill(&self, id: usize);
}

struct LocalExecutor {
    tasks: Arc<Mutex<HashMap<usize, bool>>,
}

impl Executor for LocalExecutor {
    async fn execute(&self, id: usize, &self, task: Task) -> Result<TaskAttempt> {
        {
            let tasks = self.tasks.lock().unwrap();
            tasks.insert(id, true);
        }
        // Spawn the command, intermittently check the run flag
        // and kill subprocess if it is false
    }
    async fn kill(&self, id: usize) {
        let tasks = self.tasks.lock().unwrap();
        if let Some(run_flag) = tasks.get_mut(&id) {
            *run_flag = false;
        }
    }
}


// The problem is here:

#[tokio::test]
async fn test_stop_task() {
    let le = LocalExecutor::new();

    // Spawn a long-running task
    // Error: borrowed value does not live long enough
    let handle = spawn_local(le.execute(0, task));

    // Cannot 
    le.stop_task(0, 0).await.unwrap();
}

The idea is that I can submit a task to get executed with an ID, and at a later time choose to stop the task (e.g. at a user's request). The execute method should return a TaskAttempt even if the task was killed, so I can't just kill the joinhandle.

It looks like le needs to be moved to spawn_local to avoid the lifetime error, but then I lose the ability to call stop_task on it.

Am I approaching this with a completely wrong design idea?

Using run flags that you periodically check are generally a quite bad way to kill tasks. The Tokio runtime already provides an abort method that you can call on a spawned task. There's no problem with doing this even if you want execute to return something else in that case — you should just match on the return value and return what you want.

Anyway, regarding calling the methods in spawn, then you should probably add a #[derive(Clone)] so that you can make a separate clone per task. The Arc ensures that the field is shared between all clones.

The use case here is that LocalExecutor will use async_process::Command to spawn a child task. If the task is killed, I still want to collect the stdout and stderr from the child, which I don’t think tokio’s abort will let me do.

I’ll give it a try with the clone, thank you very much for the idea!

Even then, you should be using a select! on a channel to wait for shutdown rather than checking a boolean.

Coming back months later to say that using select! was the right solution all along. It took me a while to get around to thinking of async as event-driven programming, vs how I normally think about concurrent execution (spinning loops, etc).

select! and futures::stream::futures_unordered::FuturesUnordered gave me the tools I needed to get the behaviours I needed. Thank you!

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.