I am building a client, which writes data in batches to a backend datastore. Part of the functionality is to have the write processes triggered when either the batch size or a defined time interval is reached. That part of the code works fine. I have a funciton close
which the user of the client is supposed to call when all work is done, in order to execute a final batch write with whatever is left in the write buffer and propery terminate the timer thread. However, I would like to ensure this cleanup work is triggered when the client is dropped, even if the user has omitted to call close
- as a savety net so to say.
In a simplified way, my code looks the following:
#[derive(Debug, Clone)]
enum EventType {
Data(String),
TimeTrigger,
}
struct TimedThing {
timer: Option<JoinHandle<()>>,
end_timer: Arc<Mutex<bool>>,
sender: Sender<EventType>,
listener: Option<JoinHandle<()>>,
}
impl TimedThing {
pub fn new() -> Self {
let (sender, receiver): (Sender<EventType>, Receiver<EventType>) = channel();
let end_timer = Arc::new(Mutex::new(false));
let inner_sender = sender.clone();
let mut me = TimedThing {
timer: None,
end_timer,
sender: inner_sender,
listener: None,
};
let end_timer = Arc::clone(&me.end_timer);
let timer_sender = sender.clone();
let timer_thread = thread::spawn(move || {
loop {
//sending timer ticks as long as the end_timer flag is not truned on
if *end_timer.lock().unwrap() == false {
thread::sleep(Duration::from_millis(1000));
let data = EventType::TimeTrigger;
timer_sender.send(data.clone()).unwrap();
} else {
printfmt!("end_timer is now {}", end_timer.lock().unwrap());
break;
}
}
});
let listener_thread = thread::spawn(move || {
loop {
// read from the channel
let event_received = receiver.recv();
match event_received {
Ok(event) => match event {
EventType::Data(msg) => {
printfmt!("{}", msg);
}
EventType::TimeTrigger => {
printfmt!("{}", "Time trigger received".to_string());
}
},
Err(_err) => {
printfmt!("Listener: Sender channel is gone - closing listener");
break;
}
}
}
});
me.timer = Some(timer_thread);
me.listener = Some(listener_thread);
me
}
pub fn process_input(&self, msg: String) {
self.sender.send(EventType::Data(msg)).unwrap();
}
pub fn close(&mut self) {
*self.end_timer.lock().unwrap() = true;
printfmt!(
"end_timer is now set to: {}",
self.end_timer.lock().unwrap()
);
match &self.timer {
Some(t) => {
t.join().unwrap();
}
None => {}
}
// self.timer.unwrap().join().unwrap();
}
}
impl Drop for TimedThing {
fn drop(&mut self) {
self.close();
}
}
fn main() {
printfmt!("Main is creating TimedThing instance now...");
let mut tt = TimedThing::new();
printfmt!("Main is going to sleep now...");
thread::sleep(Duration::from_secs(3));
tt.process_input("This is a message from main".to_string());
thread::sleep(Duration::from_millis(1500));
tt.process_input("This is another message from main".to_string());
thread::sleep(Duration::from_secs(5));
printfmt!("Main is done, TimedThing is now being closed...");
tt.close();
printfmt!("Main is ending now...");
}
However, the above code leads to the error
error[E0507]: cannot move out of '*t' which is behind a shared reference
--> src\main.rs:90:17
|
90 | t.join().unwrap();
| ^ ------ '*t' moved due to this method call
| |
| move occurs because `*t` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: 'JoinHandle::<T>::join' takes ownership of the receiver 'self', which moves '*t'
--> C:\Users\Dani\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:1649:17
|
1649 | pub fn join(self) -> Result<T> {
| ^^^^
It is clear that I can get this working, if I leave the implementation of Drop out and change the function signature form pub fn close(&mut self)
to pub fn close(sef)
, so the function takes ownership of self and can pass ownerhsip to t.join()
. On the other hand, the implementation of Drop expects the function signature to be a mutable reference to self. I have tried several combinations, but essentially it always fails on the fact the the Drop trait has to be implemented with a mutable referece to self and the join()
function of JoinHandle<()>
want to take ownership.
Is there a way to get this working? I have read through a number of topics in this forum as well as Stackoverflow questions, but all the suggestions given don't help in this scenario. I am sure I am missing some fundamental point.