How to have `JoinHandle` inside `rocket::Fairing` for shutdown?

I wanted to spawn a worker thread next to rocket which I can terminate/join while rocket is shutting down with a corresponding rocket::Fairing:

/*
[dependencies]
[dependencies.rocket]
version = "=0.5.0"
features = ['json']
*/

use rocket::fairing::{Fairing, Info, Kind};
use std::{
    sync::{
        mpsc::{self, Sender, TryRecvError},
    },
    thread::{self, JoinHandle},
    time::Duration,
};

enum Notification {
    Cancel,
}

struct StatusConsumerShutdown {
    handle: JoinHandle<()>,
    tx: Sender<Notification>,
}

#[rocket::async_trait]
impl Fairing for StatusConsumerShutdown {
    fn info(&self) -> Info {
        Info {
            name: "Status Consumer Shutdown Fairing",
            kind: Kind::Shutdown,
        }
    }

    /// Cannot change this trait to `self`...... 
    async fn on_shutdown(&self, _rocket: &rocket::Rocket<rocket::Orbit>) {
        if let Err(e) = self.tx.send(Notification::Cancel) {
            println!(
                "Could not send cancel notification: error:\n{}", e
            );
        }

        /// I know that the following loop is blocking the executor, 
        /// but that is not relevant for the question.
        while !self.handle.is_finished() {
            self.handle.join(); ///////////// DOES NOT COMPILE DUE TO `&self`.
        }
    }
}

pub fn spawn_status_consumer() -> impl Fairing {
    let (tx, rx) = mpsc::channel::<Notification>();

    let handle = thread::spawn(move || loop {
        println!("Working...");
        thread::sleep(Duration::from_millis(500));

        match rx.try_recv() {
            Ok(Notification::Cancel) => {
                println!("Received cancelation -> Terminating.");
                break;
            }
            Err(TryRecvError::Disconnected) => {
                println!( "Parent thread disconnected -> Terminating.");
                break;
            }
            Err(TryRecvError::Empty) => {}
        }
    });

    return StatusConsumerShutdown { handle, tx };
}

fn main() {
    let shutdown_fairing = spawn_status_consumer();
    rocket::build().attach(shutdown_fairing).launch();
}

I know that JoinHandle::join(self) wants to take ownership and destroy it self. How can I compile the above, I thought about wrapping the JoinHandle into a Box but that does not help as well.

Not sure: Technically I think I dont need to join the thread since the thread should exit once the main thread destroys its tx handle in StatusConsumerShutdown...

If on_shutdown was modified to take &mut self, then it's pretty easy:

(&mut self.handle).await;

This works since mutable references to futures are futures.

With &self, you will need a different mechanism than JoinHandle.

ok, yes, but I cannot change the signature of this Trait... its rocket::Fairing

then you can use interior mutability to get a mut reference, and wrap the handle with Option to be able to get an owned value from a mut reference.

I don't have much experience with async_trait, I don't know whether a RefCell would be enough, if not, you can always use a Mutex, maybe an async aware one.

for example, with mutex, it looks something like:

struct StatusConsumerShutdown {
    handle: Mutex<Option<JoinHandle<()>>>,
    tx: Sender<Notification>,
}

async fn on_shutdown(&self, ...) {
    //...
    if let Some(handle) = self.handle.lock().unwrap().take() {
        handle.join().unwrap();
    }
}

I don't understand the logic in your original example, you cannot call a method requiring an owned self inside a loop, so I removed the loop, as JoinHandle::join() will block on the thread if it's not finished, but will return immediately if the thread already finished, so the is_finished() check seems redundant anyway.

1 Like

Thanks for this solution, this now works: but I am a bit puzzled:

I might still have troubles understanding internal mutability. What I thought happens: I see &self is not mut so it cannot change, that means also you cannot move out any members of this struct, e.g. handle or tx, except that Mutex provides internal mutability which now makes lock(&self).unwrap() return and MutexGuard<'_, Option<...>> which implements Deref so you can do .take() to move the handle out (leave None in the handle) and then call join on it.
Internal mutability is a strange thing....

Here's one way to think of it. The language has the mut keyword to control mutability statically, meaning that safety problems related to mutability, such as allowing two different threads to concurrently modify the same object, can be caught by the compiler.

But there are other ways to check for such errors at runtime. One way is using a Mutex, which is a lock, to block other threads attempting to modify an object, while one thread is modifying it. Mutexes can be implemented internally using atomics and with other techniques.

But a Mutex is only one of many possibilities for providing such runtime checks. So rather than building the Mutex type into the language, Rust provides the interior mutability pattern so that such things can be implemented by libraries, including the std library that contains the Mutex type.

Interior mutability is also explained in the book.

1 Like

The maintainer of rocket also gave this answer which I cannot get to work.
It basically reverses the actors. The consumer waits on the Shutdown future to complete, and then sends a message over the oneshot channel tx_consumer_done and the fairing then tries to receive it, but now I have the same problem since rx_consumer_done.await apparently needs the future moved, which I cannot do in a &self...

Playground

/*
[dependencies]
[dependencies.rocket]
version = "=0.5.0"
features = ['json']
*/

use rocket::{
    fairing::{Fairing, Info, Kind},
    tokio::{
        self, select,
        sync::oneshot::{Receiver, Sender},
    },
    Shutdown,
};
use std::{time::Duration};

enum Notification {
    Cancel,
}

/// Rocket fairing to wait for consumer to be done.
pub struct WaitForConsumerDone {
    pub rx_consumer_done: Receiver<()>,
}

/// Helper data for the status consumer to react on a shutdown and to acknowledge this
/// by `tx_shutdown`.
pub struct WaitForShutdown {
    pub shutdown: Shutdown,
    pub tx_consumer_done: Sender<()>,
}

#[rocket::async_trait]
impl Fairing for WaitForConsumerDone {
    fn info(&self) -> Info {
        Info {
            name: "Status Consumer Shutdown Fairing",
            kind: Kind::Shutdown,
        }
    }

    async fn on_shutdown(&self, _rocket: &rocket::Rocket<rocket::Orbit>) {
        self.rx_consumer_done.await;
    }
}

/// Spawn a status queue consumer thread.
pub async fn spawn_status_consumer(
    wait_for_shutdown: WaitForShutdown,
) {
    tokio::task::spawn(async move {
        loop {
            select! {
                _ = wait_for_shutdown.shutdown.clone() => break,
                _ = async {
                    tokio::time::sleep(Duration::from_millis(100)).await;
                } => ()
            }
        }

        // Notify that we finished.
        wait_for_shutdown.tx_consumer_done.send(());
    });
}

fn main() {
}