Hello, I am new to rust and am looking for help with best practice around structuring rust code. I am trying to write a queue that invokes some action on a producer when a consumer dequeues it. At the moment I have implemented this using the callback pattern below, which allows the producer to pass its state to the consumer by separating it into a separate ProducerData
struct and passing a Arc
reference (note ProducerData
cannot be copied as it may later contain complex structs). Is it possible to do this in rust avoiding the callbacks and perhaps the separate ProducerData
struct ? I thought about storing Producer
or ProducerData
in the queue, however I want to support generic producers which may need to have different methods invoked (such as produce
and produce_hello
). Is it possible to somehow to this with a generic trait?
use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
pub struct Queue<'a> {
items: Mutex<VecDeque<Box<dyn FnOnce() + Send + 'a>>>,
condvar: Condvar,
}
impl<'a> Queue<'a> {
pub fn new() -> Self {
Self {
condvar: Condvar::new(),
items: Mutex::new(VecDeque::new()),
}
}
pub fn enqueue(&self, cb: impl FnOnce() + Send + 'a) {
let mut q = self.items.lock().unwrap();
q.push_back(Box::new(cb));
println!("Enqueued item");
self.condvar.notify_one();
}
fn dequeue(&self) -> Box<dyn FnOnce() + Send + 'a> {
let mut q = self.items.lock().unwrap();
while q.is_empty() {
q = self.condvar.wait(q).unwrap();
}
q.pop_front().unwrap()
}
pub fn dispatch(&self) {
loop {
let cb = self.dequeue();
println!("Dequeued item");
cb();
}
}
}
struct ProducerData {
id: u32,
invoke_count: u32
}
impl ProducerData {
fn print_state(&self) {
println!("Producer {} invoked {} times! thread - {}",
self.id, self.invoke_count, thread::current().name().unwrap());
}
fn print_hello(&self) {
println!("Hello world from producer {}! thread - {}",
self.id, thread::current().name().unwrap());
}
}
struct Producer {
queue: Arc<Queue<'static>>,
data: Arc<Mutex<ProducerData>>
}
impl Producer {
fn new(id: u32, queue: Arc<Queue<'static>>) -> Self {
Self {
queue,
data: Arc::new(Mutex::new( ProducerData { id, invoke_count: 0 }))
}
}
fn print_state(&self) {
self.data.lock().unwrap().print_state();
}
fn produce(&self) {
let cb_data = Arc::clone(&self.data);
// Enqueuing on main thread
self.queue.enqueue(move || {
// Executed on consumer thread by dispatch method of queue
let mut data = cb_data.lock().unwrap();
data.invoke_count += 1;
data.print_state();
})
}
fn produce_hello(&self) {
let cb_data = Arc::clone(&self.data);
// Enqueuing on main thread
self.queue.enqueue(move || {
// Executed on consumer thread by dispatch method of queue
cb_data.lock().unwrap().print_hello();
})
}
}
fn main() {
let q = Arc::new(Queue::new());
let p1 = Producer::new(1, Arc::clone(&q));
let p2 = Producer::new(2, Arc::clone(&q));
// Consumer thread
let c = thread::Builder::new().name("consumer".to_string()).spawn(move
|| q.dispatch() ).unwrap();
// Produce on main thread
p1.produce();
p2.produce();
p2.produce();
p1.produce_hello();
// Can also access producer data from main thread/producer struct
p1.print_state();
c.join().unwrap();
}