How to make an EventBus using a list of FnMut

I'm writing a game server from scratch. It has a Game module that is responsible for the game state and logic. And it has a WSServer module that is responsible for all the websocket connections and associated logic (using the ws crate). I wanted to keep both of these modules agnostic of each other, so I decided to create an EventBus module that they could post and subscribe to. Here's the code for that.

pub struct EventBus<'a, E> 
where E: Copy {
  pub subscribers: Vec<&'a mut dyn FnMut(E)>,
}

impl<'a, E> EventBus<'a, E> 
where E: Copy {
  pub fn new() -> EventBus<'a, E> {
    EventBus {
      subscribers: Vec::new(),
    }
  }

  pub fn post_event(&mut self, event: E) {
    for sub in self.subscribers.iter_mut() {
      sub(event);
    }
  }

  pub fn subscribe(&mut self, subscriber: &'a mut dyn FnMut(E)) {
    self.subscribers.push(subscriber);
  }
}

It's really simple, if WSServer needs to know when a certain kind of game event occurs, it can just subscribe to the correct EventBus for that particular event. Then, whenever that event occurs, Game calls post_event(my_event), and the callback that WSServer registered gets invoked.

EventBus compiles fine. My problems start when I try to use it. Specifically, when calling subscribe() in my tests, I always seem to get various errors no matter what I try.

Here's an example with some test attempts.
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=0d8f7a6c0c2d266d099ba6de30745b8d

I'm starting to think maybe I'm violating some meta-level rust design principle, and this sort of thing is prevented by design? What am I doing wrong?

You should probably use owned callbacks (Box<dyn FnMut(E)>) in EventBus, not borrowed callbacks &mut dyn FnMut(E). That will take care of the "temporary dropped while borrowed" issues you're seeing.

2 Likes

Thank you. I like using Box here; it feels much simpler.

My first test now works (yay!), but I'm still having trouble with my second test.

  #[test]
  fn calling_post_event_alerts_all_subscribers() {
    let subscriber_a_was_alerted = Mutex::new(false);
    let subscriber_b_was_alerted = Mutex::new(false);
    { // try to scope eventBus so that subscriber_a/b_was_alerted outlives it
      let mut eventBus = EventBus::<i32>::new();
      eventBus.subscribe(Box::new(|_: i32|{
        let mut subscriber_a_was_alerted = subscriber_a_was_alerted.lock().unwrap();
        *subscriber_a_was_alerted = true;
      }));
      
      eventBus.subscribe(Box::new(|_: i32|{
        let mut subscriber_b_was_alerted = subscriber_b_was_alerted.lock().unwrap();
        *subscriber_b_was_alerted = true;
      }));
    
      eventBus.post_event(6);
    }

    assert!(true, *subscriber_a_was_alerted.lock().unwrap());
    assert!(true, *subscriber_b_was_alerted.lock().unwrap());
  }

I'm getting 'subscriber_a_was_alerted' does not live long enough.
And the same for subscriber_b_was_alerted

Here's the playground: Rust Playground

Is this because the closures are in Boxes now which live on the heap? How do I make those two captured variables live long enough?

If you want the owned callbacks to be able to capture references to their calling environment, like subscriber_a_was_alerted, you'll need to add an explicit lifetime bound to the callback type and EventBus, like this:

pub struct EventBus<'b, E> {
    pub subscribers: Vec<Box<dyn FnMut(E) + 'b>>,
}

This is because Box<dyn FnMut(E)> is effectively an abbreviation for Box<dyn FnMut(E) + 'static>, which says that the callbacks may not capture any data by non-'static reference. (The difference from your initial EventBus, which also had a lifetime parameter, is that your original code said that the closure's environment data as a whole was borrowed by EventBus, whereas this says that the environment data is owned, but that owned data may include non-'static references.)

You don't need the extra block to limit the scope of eventBus; the compiler is smart enough that adding it won't make a difference to whether the borrow check succeeds.

2 Likes

It worked! Thanks for the great explanation too :smiley:

For anyone curious, here's the final working EventBus

pub struct EventBus<'b, E> 
where E: Copy {
  pub subscribers: Vec<Box<dyn FnMut(E) + 'b>>,
}

impl<'b, E> EventBus<'b, E>
where E: Copy {
  pub fn new() -> EventBus<'b, E> {
    EventBus {
      subscribers: Vec::new(),
    }
  }

  pub fn post_event(&mut self, event: E) {
    for sub in self.subscribers.iter_mut() {
      sub(event);
    }
  }

  pub fn subscribe(&mut self, subscriber: Box<dyn FnMut(E) + 'b>) {
    self.subscribers.push(subscriber);
  }
}

// !TESTS----------------------------------------------------------------------------------------------------------------------------
#[cfg(test)]
mod tests {
  use std::sync::Mutex;
  use super::*;

  #[test]
  fn constructor_works() {
    EventBus::<i32>::new();
  }

  #[test]
  fn subscribe_adds_subscriber() {
    // arrange
    let mut event_bus = EventBus::<i32>::new();
    // act
    event_bus.subscribe(Box::new(|_|{print!("heyo!")}));
    // assert
    assert_eq!(1, event_bus.subscribers.len());
  }

  #[test]
  fn calling_post_event_alerts_all_subscribers() {
    let subscriber_a_was_alerted = Mutex::new(false);
    let subscriber_b_was_alerted = Mutex::new(false);

    let mut event_bus = EventBus::<i32>::new();
    event_bus.subscribe(Box::new(|_: i32|{
      let mut subscriber_a_was_alerted = subscriber_a_was_alerted.lock().unwrap();
      *subscriber_a_was_alerted = true;
    }));
    
    event_bus.subscribe(Box::new(|_: i32|{
      let mut subscriber_b_was_alerted = subscriber_b_was_alerted.lock().unwrap();
      *subscriber_b_was_alerted = true;
    }));
  
    event_bus.post_event(6);

    assert!(true, "{}", *subscriber_a_was_alerted.lock().unwrap());
    assert!(true, "{}", *subscriber_b_was_alerted.lock().unwrap());
  }
} 

And the playground: Rust Playground

2 Likes