Tokio restart failed task

I have a number of forever-running tasks that I'd like to be able to restart on failure. For example, in the following example, I start two "workers". These should run forever, and if they fail, I'd like to be able to restart them.

async fn worker(worker_id: &str)  {
  loop {
    let res = reqwest::get(...).await.unwrap();
    // process response
  }
}

#[tokio::main]
async fn main() {
  tokio::spawn(async { worker("wrk1").await; });
  tokio::spawn(async { worker("wrk2").await; });

  // start webserver
}

For now I'm just unwrapping any possible error and using using panic = 'abort' to kill the entire app and have it restarted, but I'm wondering if there's a delicate approach?

You can spawn an extra task for managing and restarting it.

Is using futures::future::select_all the right tool for that? I came up with something like this (I'll need to use idx to figure out which worker to restart)

async fn supervisor() {
	let w1 = tokio::spawn(async { worker("wrk1").await; });
	let w2 = tokio::spawn(async { worker("wrk2").await; });

	let mut workers = vec![w1, w2];
	loop {
		let supervisor = futures::future::select_all(workers);
		workers = match supervisor.await {
			(Err(_), _idx, mut workers) => {
				let restarted = tokio::spawn(async { worker("wrk1").await; });
				workers.push(restarted);
				workers
			},
			(Ok(_), _, _) => {
				// TODO: should not happen, just return an empty vec for now
				vec![]
			}
		}
	}
}

I would not use select_all. Just spawn a separate manager per task you want to manage.

Do you have an example you could give? Coming from Erlang/Elixir, it seems you’d want some supervisor which know which children (tasks) it manages.

In the case of system shutdown you probably want to shut things down in a particular order (stop accepting new web requests, allow any background work to finish, then shutdown the database pool).

Would be nice to see a code example for this pattern.

You can do it like this:

loop {
    let res = tokio::spawn(your_fallible_task()).await;
    match res {
        Ok(output) => { /* handle successfull exit */ },
        Err(err) if err.is_panic() => { /* handle panic in task, e.g. by going around loop to restart task */ },
        Err(err) => { /* handle other errors (mainly runtime shutdown) */ },
    }
}

As for shutdown, check out this page.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.