Tokio runtime panics at shutdown

Hi everyone,

I have a problem with shutting down my Tokio runtime gracefully. This time I actually know what causes the error, I just can't fix it. Here's the piece of code that matters (a shortened version of my main function):

#[tokio::main(flavor = "multi_thread")]
async fn main() -> XibendResult<()> {
    /* snip */
    static CLIENT: Lazy<Client> = Lazy::new(Client::new);
    static RT: Lazy<Handle> = Lazy::new(Handle::current);

    // Start the cache manager thread
    RT.spawn_blocking(|| {
        RT.block_on(async {
            loop {
                // Refresh common data cache
                if let Err(err) = XibendCache::refresh_common_data(&CLIENT).await {
                    error!("CommonDataCache: {err:?}");
                }

                // Refresh weather symbol cache
                if let Err(err) = XibendCache::refresh_weather_symbols(&CLIENT).await {
                    error!("WeatherSymbolCache: {err:?}");
                }

                info!("Caches were refreshed successfully");

                // Wait for 5 minutes
                sleep(Duration::from_secs(300)).await;
            }
        });
    });

    // Create the socket & the router
    let socket = SocketAddr::new(*ADDRESS, *PORT);
    let router = Router::new();
        /* many routes */

    // Start the Axum HTTP server
    Server::bind(&socket)
        .serve(router.into_make_service())
        .with_graceful_shutdown(async {
            tokio::signal::ctrl_c()
                .await
                .expect("Failed to catch the SIGINT signal")
        })
        .await
        .expect("Failed to start the Axum server");
    Ok(())
}

And when I press Ctrl + C, I get the following panic:

thread 'tokio-runtime-worker' panicked at 'A Tokio 1.x context was found, but it is being shutdown.', /home/miika/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.28.0/src/runtime/time/entry.rs:553:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Now, I know that this panic is related to the cache manager thread I start before the server. I think that the problem occurs when the runtime starts shutting down but the thread (which runs endlessly) still wants to continue. The question is: how can I shutdown everything gracefully? I found a single post regarding the same topic (Tokio panics during shutdown) but I couldn't figure out how to implement this in my case. Any help would be greatly appreciated!

Basically, to prevent that error, you must not return from main until all spawned tasks have exited. There's some information on this here.

1 Like

For your code snippet, a simple fix

use axum::{Router, Server};
use std::{net::SocketAddr, time::Duration};
use tokio::{runtime::Handle, select, time::sleep};

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create the socket & the router
    let socket = SocketAddr::new("0.0.0.0".parse()?, 12678);
    let router = Router::new();
    /* many routes */

    // Start the Axum HTTP server
    let task_server = tokio::spawn(
        Server::bind(&socket)
            .serve(router.into_make_service())
            .with_graceful_shutdown(async {
                tokio::signal::ctrl_c()
                    .await
                    .expect("Failed to catch the SIGINT signal")
            }),
    );

    // Start the cache manager thread
    Handle::current()
        .spawn_blocking(move || {
            Handle::current().block_on(async move {
                // ...

                select! {
                    _ = sleep(Duration::from_secs(300)) => println!("sleep done"),
                    ctrl_c = task_server => println!("server closed: {ctrl_c:?}"),
                }
            });
        })
        .await?;
    Ok(())
}
$ cargo r -q
^Cserver closed: Ok(Ok(()))

You may use a channel in more complicated real code to communicate between tasks.

I really would like to keep the axum server on the main thread, is that anyhow possible?

Oh, I forget to keep the server running... The fix above lets the server shut down after the blocking task finishes, which is not usually wanted. So the second fix is via Barrier in tokio::sync - Rust

use axum::{Router, Server};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::{runtime::Handle, select, sync::Barrier, time::sleep};

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create the socket & the router
    let socket = SocketAddr::new("0.0.0.0".parse()?, 12678);
    let router = Router::new();
    /* many routes */

    // wait for ctrl-c: can be given to multiple tasks
    let barrier = Arc::new(Barrier::new(2));

    // Start the cache manager thread
    let blocking_task = {
        let barrier = barrier.clone();
        Handle::current().spawn_blocking(move || {
            Handle::current().block_on(async move {
                // ...

                select! {
                    _ = sleep(Duration::from_secs(3)) => println!("sleep done"),
                    _ = barrier.wait() => println!("server closed"),
                }
            });
        })
    };

    // Start the Axum HTTP server
    Server::bind(&socket)
        .serve(router.into_make_service())
        .with_graceful_shutdown(async move {
            tokio::signal::ctrl_c()
                .await
                .expect("Failed to catch the SIGINT signal");
            barrier.wait().await;
        })
        .await?;

    // ensure the blocking_task is finished
    blocking_task.await?;

    Ok(())
}

The key point is to tell the task to finish before the runtime shuts down. Otherwise, you'll get the runtime worker's panic.

1 Like

Thank you, this fix works! For any future readers, I use this code in the blocking task:

select! {
    _ = sleep(Duration::from_secs(300)) => {},
    _ = barrier.wait() => break,
}

I use this because the task runs an infinite loop and I want to break the loop if a shutdown signal is received.

I found Barrier is heavier than Notify in tokio::sync - Rust in your case. Barrier is based on a mutex and a watch channel that needs a RwLock, Notify etc. But luckily the third fix is similar to the second:

-let barrier = Arc::new(Barrier::new(2));
+let notify = Arc::new(Notify::new());

-let barrier = barrier.clone();
+let notify = notify.clone();

-select! { ..., _ = barrier.wait()
+select! { ..., _ = notify.notified()

-barrier.wait().await;
+notify.notify_waiters();
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.