Force quit spawned async task

Is there any way to force quit spawned async task using async-std? I spawn a task as let handle = task::spawn(...); and then I need to restart the whole app so I was thinking about calling something like handle.kill(); and then do the respawn.

Example:

#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
   ...
   let mut ping;
   while let Some(message) = receiver.recv().await {
     if message == "START" {
       ping = task::spawn(async move {
         let mut interval = stream::interval(Duration::from_secs(1));
         while let Some(_) = interval.next().await {
           println!("PING");
         }
       })
     } else if message == "RESTART" {
       ping.kill();
       sender.send("START").await;
     }
   }
   ...
}

How can I get this behaviour? Should I wrap the whole app into a thread and force kill a thread?

You will need the spawned task to cooperate, e.g. by it listening to some outside messages.
The crate stop-token implements such a pattern for cooperative cancellation.

@jer thanks for your answer. I saw that project before but it seems to me that the task will not force quit the async tree. So let say I have TcpListenere ... will it be force dropped (like using libc) as soon as the token is stopped? I need to basically remove everything from ram and recursively restart the app.

Could you use the futures::future::Abortable wrapper from the futures crate to kill the spawned task?

For example, spawning task::spawn(Abortable::new(my_important_task, abort_registration)) and then calling AbortHandle::abort() when you get a "RESTART" message. With the entire thing in one loop {} so it automatically re-starts your main business logic.

@Michael-F-Bryan I'm trying to make it work in a loop but I get use of moved value. Any ideas?

@Michael-F-Bryan OK, I have a working example for Abortable::new(task::spawn(...)) and the task is not canceled.

I was thinking something like this:

#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
   ...
   let mut ping;
   // because we could get multiple START messages before a RESTART, handles
   // for all spawned task are stored here.
   let mut abort_handles = Vec::new();

   while let Some(message) = receiver.recv().await {
     if message == "START" {
      let (handle, registration) = AbortHandle::new_pair();
      abort_handles.push(handle);

      let fut = async move {
         let mut interval = stream::interval(Duration::from_secs(1));
         while let Some(_) = interval.next().await {
           println!("PING");
         }
       };

       task::spawn(Abortable::new(fut, registration));
     } else if message == "RESTART" {
        // tell all pending receivers to stop, clearing the abort_handles
        // list in the process
        for handle in abort_handles.drain(..) {
          handle.abort();
        }

       sender.send("START").await;
     }
   }
   ...
}

The main idea is that the AbortHandle for each spawned task gets added to a list (there's no guarantee we'll have a 1:1 relation between START and RESTART messages), then when we get a RESTART message we tell every pending task to abort and clear the list for the next batch of START messages... Does that make sense?

1 Like

Ha ... you did it :). In the background, I started writing my own Future implementation :). No need for that now. Thanks!

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.