Implement Drop trait to do clean-up work leads to error "cannot move out of `_` which is behind a shared reference"

I am building a client, which writes data in batches to a backend datastore. Part of the functionality is to have the write processes triggered when either the batch size or a defined time interval is reached. That part of the code works fine. I have a funciton close which the user of the client is supposed to call when all work is done, in order to execute a final batch write with whatever is left in the write buffer and propery terminate the timer thread. However, I would like to ensure this cleanup work is triggered when the client is dropped, even if the user has omitted to call close - as a savety net so to say.

In a simplified way, my code looks the following:

#[derive(Debug, Clone)]
enum EventType {
    Data(String),
    TimeTrigger,
}

struct TimedThing {
    timer: Option<JoinHandle<()>>,
    end_timer: Arc<Mutex<bool>>,
    sender: Sender<EventType>,
    listener: Option<JoinHandle<()>>,
}

impl TimedThing {
    pub fn new() -> Self {
        let (sender, receiver): (Sender<EventType>, Receiver<EventType>) = channel();
        let end_timer = Arc::new(Mutex::new(false));
        let inner_sender = sender.clone();

        let mut me = TimedThing {
            timer: None,
            end_timer,
            sender: inner_sender,
            listener: None,
        };

        let end_timer = Arc::clone(&me.end_timer);

        let timer_sender = sender.clone();
        let timer_thread = thread::spawn(move || {
            loop {
                //sending timer ticks as long as the end_timer flag is not truned on
                if *end_timer.lock().unwrap() == false {
                    thread::sleep(Duration::from_millis(1000));
                    let data = EventType::TimeTrigger;
                    timer_sender.send(data.clone()).unwrap();
                } else {
                    printfmt!("end_timer is now {}", end_timer.lock().unwrap());
                    break;
                }
            }
        });

        let listener_thread = thread::spawn(move || {
            loop {
                // read from the channel
                let event_received = receiver.recv();
                match event_received {
                    Ok(event) => match event {
                        EventType::Data(msg) => {
                            printfmt!("{}", msg);
                        }
                        EventType::TimeTrigger => {
                            printfmt!("{}", "Time trigger received".to_string());
                        }
                    },
                    Err(_err) => {
                        printfmt!("Listener: Sender channel is gone - closing listener");
                        break;
                    }
                }
            }
        });

        me.timer = Some(timer_thread);
        me.listener = Some(listener_thread);

        me
    }

    pub fn process_input(&self, msg: String) {
        self.sender.send(EventType::Data(msg)).unwrap();
    }

    pub fn close(&mut self) {
        *self.end_timer.lock().unwrap() = true;
        printfmt!(
            "end_timer is now set to: {}",
            self.end_timer.lock().unwrap()
        );
        match &self.timer {
            Some(t) => {
                t.join().unwrap();
            }
            None => {}
        }
        // self.timer.unwrap().join().unwrap();
    }
}

impl Drop for TimedThing {
    fn drop(&mut self) {
        self.close();
    }
}

fn main() {
    printfmt!("Main is creating TimedThing instance now...");
    let mut tt = TimedThing::new();
    printfmt!("Main is going to sleep now...");
    thread::sleep(Duration::from_secs(3));
    tt.process_input("This is a message from main".to_string());
    thread::sleep(Duration::from_millis(1500));
    tt.process_input("This is another message from main".to_string());
    thread::sleep(Duration::from_secs(5));
    printfmt!("Main is done, TimedThing is now being closed...");
    tt.close();
    printfmt!("Main is ending now...");
}

However, the above code leads to the error

error[E0507]: cannot move out of '*t' which is behind a shared reference
    --> src\main.rs:90:17
     |
90   |                 t.join().unwrap();
     |                 ^ ------ '*t' moved due to this method call
     |                 |
     |                 move occurs because `*t` has type `JoinHandle<()>`, which does not implement the `Copy` trait
     |
note: 'JoinHandle::<T>::join' takes ownership of the receiver 'self', which moves '*t'
    --> C:\Users\Dani\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:1649:17
     |
1649 |     pub fn join(self) -> Result<T> {
     |                 ^^^^

It is clear that I can get this working, if I leave the implementation of Drop out and change the function signature form pub fn close(&mut self) to pub fn close(sef), so the function takes ownership of self and can pass ownerhsip to t.join(). On the other hand, the implementation of Drop expects the function signature to be a mutable reference to self. I have tried several combinations, but essentially it always fails on the fact the the Drop trait has to be implemented with a mutable referece to self and the join() function of JoinHandle<()> want to take ownership.

Is there a way to get this working? I have read through a number of topics in this forum as well as Stackoverflow questions, but all the suggestions given don't help in this scenario. I am sure I am missing some fundamental point.

Try self.timer.take().unwrap().join().unwrap();

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.