Observer pattern in Rust

If this isn’t absolutely performance-critical to the point of needing to use only memory on stack, consider using Rc/Arc instead. This will make all references easier to work with.

Is it a requirement to use (ie modify) stack variables from the observer? This is basically what @kornel is asking. If not, you can put the shared state into an Rc<RefCell<...>>> (or Rc<Cell<...>>) and keep it alive that way.

I meant channel in the abstract sense - haven’t thought about the particular implementation.

1 Like

No, definitely not.

I'll try this way. Thank you!

I eventually tried to implement Observer using raw pointer to store callbacks.

pub struct Observable<T> {
    value: T,
    callbacks: Vec<*const FnMut(&T)>,
}

However, when I try to add callback:

impl <T> Observable<T> {
    // 's (callback) lifetime is shorter than 'l (self) lifetime
    pub fn subscribe<'s, 'l: 's, C>(&'l self, callback: C) where C: FnMut(&T) + 's {
        let ptr = &callback as *const C;
        self.callbacks.push(ptr);
    }
}

compilation fails: error[E0310]: the parameter type C may not live long enough.

It seems that Vec<*const FnMut(&T)> in structure definition has some implicit 'static lifetime on FnMut.


Is there a way to store raw pointer to FnMut w/o lifetime?

Did you try the Rc<RefCell<...>> approach?

What you have above with the raw ptr is incorrect - the callback is moved into the subscribe method and will be dropped at the end of the method if nothing retakes ownership of it. If you just add a raw ptr of it to the Vec you’ll be holding a dangling pointer.

No, since I’m planning to return Subscription from this method (where I will own passed callback).
In Rc<RefCell<..>> approach i would return Rc<RefCell<Subscription>>, that seemed to me as weird API, so I decided to try raw pointers first.

The intention was to store raw pointer as they should have no lifetime guarantess and don’t place any restruction on callbacks lifetime. However, I can’t find any way to store Vec of callbacks without mentioning any lifetime.

You can try hiding that inside some type that internally has the Rc and RefCell and return that type. This is a common approach in Rust where you have a Foo type that your API exposes and then Foo has some Inner type that contains the guts.

But who’s going to own the closure if the Vec only has a raw ptr to it? You can allocate the closure on the heap and then leak it and store the ptr in the Vec. You’d need to find a place in code to drop that closure manually so you don’t leak forever.

If you allocate a closure on the heap then you just pass a Box to subscribe, leak the box there and store the ptr. This closure would have no lifetime parameters - Box<FnMut...> is essentially Box<FnMut... + 'static>.

1 Like

Subscription, that will implement Drop and will remove raw pointer to itself from observable when being dropped.

Ah ok, I was going by your sample code a bit too much but ok - sounds doable.

Edit: re-reading this thread from the top, I see you want to have a closure that captures a reference to a stack variable. You’ll need to transmute your closure to have a 'static lifetime before adding to the Vec. That’s obviously a lie we’re telling the compiler (and hence unsafe is required). So just be careful in how you use this code to make sure you don’t hit UB due to dangling references. I’d still explore a safe approach before resorting to this but you’ll know better.

Could you please explain what exactly you are trying to achieve with your Observer? I ask because I want to think about if there is an overall better solution in Rust.

I’m trying to create some primitives for UI state.
Imagine a UI form with some inputs and View-Model for It.
Lets assume we have to block from submitting if user entered invalid data.

I want to have observable model with auto-derivable parts (e.g. you change input and validation is automatically recomputed).

Did you ever get to an acceptable answer for this question?

I have tried to use several ideas suggested here, but unfortunately I didn’t manage to get a compiling code.
Every time I try a new approach, I get some new issues with compiling it.

To give a summarize to someone who looks at this topic and to mix in my experience. Here are two working implementations of patterns working like the observer pattern:

Observer has its own thread

If the observing objects have their own worker threads, I prefer to use the sync::mspc channels. The Observabel holds a list of mspc::Senders. If an Observer goes out of scope Sender::send() will fail and the Sender will be removed from the Observable's list.

Sharing Weak references

If the Observable's thread should call the update function I prefer giving the Observabel a list of Weak<Mutex<Observer>> (or RefCell, RwLock, depending on the use case). The Weak-reference decouples Observable and Observer. Like in the channel solution a reference is removed if upgrading the reference fails. In this case Observer is a trait with some function like update() that is called by the Observable for each element of its list of listeners.

Using Closures

Like the author of this topic I haven’t encountered a way to store closures at the Observable. Maybe their would be a way in moving Weak references inside the closures. If someone has a solution for this I would be very interested in it.

1 Like

Could you please share the code of implementation that uses Weak references approach?

Well you already posted the implementation I use (sry for skipping my sources :woozy_face:):

As you said it is not the most elegant way but it works.
Indeed my implementation for the channel-solution is based on this post:

//! Implementation of the Observer-Pattern.
//! Original code by 'locka' @ https://stackoverflow.com/questions/37572734/how-can-i-implement-the-observer-pattern-in-rust
//! Modified by farnbams: Substitute Listener by mpsc::Sender

use std::sync::mpsc::Sender;

pub trait Dispatchable<T>
    where T: Clone
{
    fn register_channel(&mut self, tx: Sender<T>);
}

pub struct Dispatcher<T>
    where T: Clone
{
    senders: Vec<(Sender<T>, bool)>,
}

impl<T> Dispatchable<T> for Dispatcher<T>
    where T: Clone
{
    /// Registers a new Sender
    fn register_channel(&mut self, tx: Sender<T>) {
        self.senders.push((tx, true));
    }
}

impl<T> Dispatcher<T>
    where T: Clone
{
    pub fn new() -> Dispatcher<T> {
        Dispatcher { senders: Vec::new() }
    }

    pub fn num_senders(&self) -> usize {
        self.senders.len()
    }

    pub fn dispatch(&mut self, msg: &T) {
        let mut cleanup = false;
        // Send messages
        for (s, living) in self.senders.iter_mut() {
            if let Err(_) = s.send(msg.clone()) {
                cleanup = true;
                *living = false;
                debug!("Receiver dropped => Clean up sender-list");
            }
        }
        // If there were invalid senders, clean up the list
        if cleanup {
            debug!("Dispatcher is cleaning up dead senders");
            self.senders.retain(| (_, ref living) | {
                // Only retain living channels
                *living
            });
        }
    }
}
1 Like

So, I went ahead and tried this, with a reference type I called Sc.

Basically, using this reference type, you can write code like this:

extern crate sc;

use sc::Dropper;
use sc::Sc;

struct Visitable {
    observers: Vec<Sc<Fn(&str) + 'static>>,
}

impl Visitable {
    pub fn new(observer_count: usize) -> Self {
        let mut v = Vec::new();
        v.reserve(observer_count);
        for _i in 0..observer_count {
            v.push(Sc::new());
        }
        Visitable { observers: v }
    }

    pub fn add_log(&self, log: &str) {
        for observer in &self.observers {
            observer.map(|observer| observer(log));
        }
    }

    #[must_use]
    pub fn register_observer<'observer, 'sc>(
        &'sc self,
        observer: &'observer (Fn(&str) + 'static),
    ) -> Option<Dropper<'observer, 'sc, Fn(&str) + 'static>> {
        let sc = self.observers.iter().find(|observer| observer.is_none())?;
        Some(sc.set(observer))
    }
}

fn foo_print(x: &str) {
    println!("Foo received '{}'", x)
}

fn main() {
    let visitable = Visitable::new(10);
    visitable.add_log("Lost for science!");
    {
        let _dropper = visitable.register_observer(&foo_print);
        visitable.add_log("Registered log");
        {
            let name = String::from("Bar");
            let lambda = move |log : &_| println!("{} received '{}'", name, log);
            let _dropper_2 = visitable.register_observer(&lambda);
            visitable.add_log("Registered log 2");
        }
    }
    visitable.add_log("Lost for science!");
}

Storing an object in a Sc does not incur any allocation, clone of copy of the object.
Code is here: https://github.com/dureuill/sc
Caveat: Sc is using unsafe, and I now of at least one cause for unsafety: if you mem::forget a _dropper. See also my other thread.

For the observer pattern, you might be interested in the library I wrote for FlowBetween, flo_binding: it has a few neat features driven by the needs of the larger application. Pertinent to this discussion is the follow() function, which turns a Bound value into a stream of changes, which is another way to solve the lifetime issues talked about here.

That Sc type looks like a really nice idea to me too: sort of the borrowing equivalent of Weak.

This is exactly what I was going for, yes. Should I go all the way and call that WeakRef, WeakBorrow or BorrowWeak?
This only thing that bothers me is that if you mem::forget the Dropper type, you then have a dangling reference in safe code. On the plus side, I believe calling mem::forget is the only way to leak a Dropper, since Dropper is parameterized by the lifetime of the object it points to, so any cycle containing droppers cannot outlive these lifetimes? I wish mem::forget weren't safe for non 'static types^^".

Perhaps what is needed is a way to specify that a type cannot be safely mem::forgotten? How unsafe is my reference type?