Rust replacing callbacks with traits

Hello, I am new to rust and am looking for help with best practice around structuring rust code. I am trying to write a queue that invokes some action on a producer when a consumer dequeues it. At the moment I have implemented this using the callback pattern below, which allows the producer to pass its state to the consumer by separating it into a separate ProducerData struct and passing a Arc reference (note ProducerData cannot be copied as it may later contain complex structs). Is it possible to do this in rust avoiding the callbacks and perhaps the separate ProducerData struct ? I thought about storing Producer or ProducerData in the queue, however I want to support generic producers which may need to have different methods invoked (such as produce and produce_hello). Is it possible to somehow to this with a generic trait?

use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

pub struct Queue<'a> {
   items: Mutex<VecDeque<Box<dyn FnOnce() + Send + 'a>>>,
   condvar: Condvar,
}

impl<'a> Queue<'a> {
   pub fn new() -> Self {
       Self {
           condvar: Condvar::new(),
           items: Mutex::new(VecDeque::new()),
       }
   }

   pub fn enqueue(&self, cb: impl FnOnce() + Send + 'a) {
       let mut q = self.items.lock().unwrap();
       q.push_back(Box::new(cb));
       println!("Enqueued item");
       self.condvar.notify_one();
   }

   fn dequeue(&self) -> Box<dyn FnOnce() + Send + 'a> {
       let mut q = self.items.lock().unwrap();

       while q.is_empty() {
           q = self.condvar.wait(q).unwrap();
       }

       q.pop_front().unwrap()
   }

   pub fn dispatch(&self) {
       loop {
           let cb = self.dequeue();
           println!("Dequeued item");
           cb();
       }
   }
}

struct ProducerData {
   id: u32,
   invoke_count: u32
}

impl ProducerData {
   fn print_state(&self) {
       println!("Producer {} invoked {} times! thread - {}",
                self.id, self.invoke_count, thread::current().name().unwrap());
   }

   fn print_hello(&self) {
       println!("Hello world from producer {}! thread - {}",
                self.id, thread::current().name().unwrap());
   }
}

struct Producer {
   queue: Arc<Queue<'static>>,
   data: Arc<Mutex<ProducerData>>
}

impl Producer {
   fn new(id: u32, queue: Arc<Queue<'static>>) -> Self {
       Self {
           queue,
           data: Arc::new(Mutex::new( ProducerData { id, invoke_count: 0 }))
       }
   }

   fn print_state(&self) {
       self.data.lock().unwrap().print_state();
   }

   fn produce(&self) {
       let cb_data = Arc::clone(&self.data);
       // Enqueuing on main thread
       self.queue.enqueue(move || {
           // Executed on consumer thread by dispatch method of queue
           let mut data = cb_data.lock().unwrap();
           data.invoke_count += 1;
           data.print_state();
       })
   }

   fn produce_hello(&self) {
       let cb_data = Arc::clone(&self.data);
       // Enqueuing on main thread
       self.queue.enqueue(move || {
           // Executed on consumer thread by dispatch method of queue
           cb_data.lock().unwrap().print_hello();
       })
   }
}

fn main() {
   let q = Arc::new(Queue::new());
   let p1 = Producer::new(1, Arc::clone(&q));
   let p2 = Producer::new(2, Arc::clone(&q));

   // Consumer thread
   let c = thread::Builder::new().name("consumer".to_string()).spawn(move
       || q.dispatch() ).unwrap();

   // Produce on main thread
   p1.produce();
   p2.produce();
   p2.produce();
   p1.produce_hello();

   // Can also access producer data from main thread/producer struct
   p1.print_state();

   c.join().unwrap();
}

The approach with seperating the ProducerData and using a callback seems reasonable to me. If you want the option to avoid the Box around and dynamic dispatch for dyn FnOnce, you could make the Queue generic on the closure type. Well acutally, first of all, I’d suggest using some kind of channel type for better efficiency:

Rust Playground

then, well.., there’s really just the dispatch function left here; that one can be easily generalized to

fn dispatch<F>(receiver: channel::Receiver<F>)
where
    F: FnOnce() + Send,
{
    while let Ok(cb) = receiver.recv() {
        println!("Dequeued item");
        cb();
    }
}

Rust Playground


Now, I was trying to demonstrate how this could be used to avoid the Box and the dyn for implementing Producer... but I ran into the problem that it isn’t possible to name closure types explicitly in Rust (yet), so I guess we could first create a Callback trait to mitigate this:

trait Callback: Send {
    fn call(self);
}

// with this impl, the `Box<dyn FnOnce() + Send>` type still works
// so we *just* generalized the type of `dispatch` below
impl<F> Callback for F
where
    F: FnOnce() + Send,
{
    fn call(self) {
        self()
    }
}

fn dispatch<C: Callback>(receiver: channel::Receiver<C>)
{
    while let Ok(cb) = receiver.recv() {
        println!("Dequeued item");
        cb.call();
    }
}

Rust Playground

Now let’s use this to avoid the extra boxing and dynamic dispatching:

Rust Playground


This answer is kind not really addressing your main questions too much except for saying that the callback approach looks fine to me. So I’ll admit that it’s more of a general code review other than that. Hope it helps you in some way anyways.

You say you want to

that statement is a little bit vague. The dispatch function as presented above does support generic callbacks, which might be enough. Creating a trait for a generic producer could then help associating the callback type to the producer; but at long as it would do only that, such a trait seems a bit pointless. Perhaps if you’re more specific about what kinds of operations you’d like to be able to perform on a “generic producer”, there could be a useful trait abstraction coming out of this.

By the way, note that by using (or just replicating the function body of) this helper function

fn dyn_callback<'a, C: Callback + 'a>(c: C) -> Box<dyn FnOnce() + Send + 'a> {
    Box::new(|| c.call())
}

you can still use the producer::ProducerCallback type, or any Callback-implementing type, in a heterogenious queue of Box<dyn FnOnce() + Send + 'a>, though in the case of the producer::Producer implemetation in the playground, it’s now using both dynamic dispatching plus “dispatching” on the Kind enum, so perhaps some (minor) losses in efficiency.

1 Like

Thanks @steffahn, the main reason that I was looking to avoid the callback approach is that it does not seem very much like idiomatic rust to me (I don't see callbacks heavily used elsewhere in rust libraries). The changes you have suggested do make it a lot cleaner though.

In my application the flow would be something like this:

  • IO event occurs (from epoll) using a library such as mio.
  • Different structs have registered their interest in different IO events by registering callbacks with the event loop. For example a TCP server has registered a TCP listener.
  • TCP listener event fired, event loop looks up associated callback for the event and invokes TCP server callback.
  • The TCP server callback does minimal work, it just pushes a callback onto a queue because it does not want to block the event loop thread.
  • The dispatch thread dequeues the callback and accepts the connection and stores it in a vector in the server struct.

The method of callbacks invoking further callbacks seems like an anti-pattern in rust to me as it involves many uses of Arc::clone(). Would it be possible instead for the TCP server to implement a trait, and an Arc<> of that trait is stored in the queue?

I have attempted it below without Arc to show a general idea of what I mean, however this does not work due to the producer not living long enough:

use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

pub struct Queue<'a> {
    items: Mutex<VecDeque<Box<dyn QueueTrait + Send + 'a>>>,
    condvar: Condvar,
}

impl<'a> Queue<'a> {
    pub fn new() -> Self {
        Self {
            condvar: Condvar::new(),
            items: Mutex::new(VecDeque::new()),
        }
    }

    pub fn enqueue(&self, qt: impl QueueTrait + Send + 'a) {
        let mut q = self.items.lock().unwrap();
        q.push_back(Box::new(qt));
        println!("Enqueued item");
        self.condvar.notify_one();
    }

    fn dequeue(&self) -> Box<dyn QueueTrait + Send + 'a> {
        let mut q = self.items.lock().unwrap();

        while q.is_empty() {
            q = self.condvar.wait(q).unwrap();
        }

        q.pop_front().unwrap()
    }

    pub fn dispatch(&self) {
        loop {
            let qt = self.dequeue();
            println!("Dequeued item");
            qt.invoke();
        }
    }
}

struct ProducerData {
    id: u32,
    invoke_count: u32
}

impl ProducerData {
    fn print_state(&self) {
        println!("Producer {} invoked {} times! thread - {}",
                 self.id, self.invoke_count, thread::current().name().unwrap());
    }

    fn print_hello(&self) {
        println!("Hello world from producer {}! thread - {}",
                 self.id, thread::current().name().unwrap());
    }
}

struct Producer {
    queue: Arc<Queue<'static>>,
    data: Arc<Mutex<ProducerData>>
}

impl Producer {
    fn new(id: u32, queue: Arc<Queue<'static>>) -> Self {
        Self {
            queue,
            data: Arc::new(Mutex::new( ProducerData { id, invoke_count: 0 }))
        }
    }

    fn print_state(&self) {
        self.data.lock().unwrap().print_state();
    }

    fn produce(&self) {
        self.queue.enqueue(self)
    }


     // TODO: invoking different methods on same struct from queue trait not supported
    fn produce_hello(&self) {
       // let cb_data = Arc::clone(&self.data);
       // Enqueuing on main thread
       // self.queue.enqueue(move || {
            // Executed on consumer thread by dispatch method of queue
       //    cb_data.lock().unwrap().print_hello();
       //})
    }
}

pub trait QueueTrait {
    fn invoke(&self);
}

impl QueueTrait for &Producer {
    fn invoke(&self) {
        let mut data = self.data.lock().unwrap();
        data.invoke_count += 1;
        data.print_state();
    }
}

fn main() {
    let q = Arc::new(Queue::new());
    let p1 = Producer::new(1, Arc::clone(&q));
    let p2 = Producer::new(2, Arc::clone(&q));

    // Consumer thread
    let c = thread::Builder::new().name("consumer".to_string()).spawn(move
        || q.dispatch() ).unwrap();

    // Produce on main thread
    p1.produce();
    p2.produce();
    p2.produce();
    p1.produce_hello();

    // Can also access producer data from main thread/producer struct
    p1.print_state();

    c.join().unwrap();
}

Thanks

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.