Trying to use capturing closures when constructing type consisting of arbitrary functions

I've tried converting a TC39 proposal's reference implementation in Rust from their code and got it compiling, but I want to add some missing functional methods such as map and filter (or called retain in Rust). Here's Observer (which implements the AbstractObserver trait):

pub struct Observer<T, Error = ()>
    where
        T: Send + Sync + 'static,
        Error: Send + Sync + 'static
{
    /// Receives the next value in the sequence.
    pub next: Box<dyn Fn(T) + Sync + Send>,
    /// Receives the sequence error.
    pub error: Box<dyn Fn(Error) + Sync + Send>,
    /// Receives a completion notification.
    pub complete: Box<dyn Fn() + Sync + Send>,
    /// Receives the subscription object when `subscribe` is called.
    pub start: Option<Box<dyn Fn(Arc<Subscription<T, Error>>) + Sync + Send>>,
}

Right now I'm trying to add firstly map and filter, but inside their inner subscription I need to capture the subscription observer:

impl<T, Error> Observable<T, Error>
    where
        T: Send + Sync + 'static,
        Error: Send + Sync + 'static
{
    /// Returns a new `Observable` that performs a map on data from the original.
    pub fn map<U>(&self, map_fn: impl Fn(T) -> U + Send + Sync + 'static) -> Observable<U, Error>
        where
            T: Clone,
            U: Send + Sync + 'static
    {
        let f: SubscriberFunction<U, Error> = Arc::new(move |observer| {
            let observer = Arc::new(observer);
            let (observer_1, observer_2, observer_3) = (Arc::clone(&observer), Arc::clone(&observer), Arc::clone(&observer));
            let subscription = self.subscribe(observer! {
                next: move |value: T| {
                    observer_1.next(map_fn(value.clone()));
                },
                error: move |error| {
                    observer_2.error(error);
                },
                complete: move || {
                    observer_3.complete();
                },
            });
            Arc::new(move || {
                subscription.unsubscribe();
            })
        });
        Observable::<U, Error>::new(f)
    }
}

The problem is, when I use the move prefix in a closure, it ends up implementing the Copy trait. It looks like I can't add Copy as a bound to my function types, so I'm lost on what to do.

Here's the source (uses std only):

(P.S.: they are not "iterators", that's why I don't use Iterator)

To clarify, here's what I'm getting:

            let subscription = self.subscribe(observer! {
                next: move |value: T| {
                //    ^^^^^^^^^^^^^^^
                //    cannot move out of `map_fn`, a captured variable in an `Fn` closure

image

I'm not too familiar with this whole space, but you might be able to do something like this (untested):

     pub fn map<U>(&self, map_fn: impl Fn(T) -> U + Send + Sync + 'static) -> Observable<U, Error>
         where
             T: Clone,
             U: Send + Sync + 'static
     {
+        let map_fn = Arc::new(map_fn);
         let f: SubscriberFunction<U, Error> = Arc::new(move |observer| {
+            let map_fn = map_fn.clone();
             let observer = Arc::new(observer);
1 Like

Thanks! Now just one more error... (I didn't mention it before because I thought just solving move of map_fn would solve it too.)

image

I suspect the problem is the use of self (which is a reference) in the SubscriberFunction closure (which has an implicit 'static bound due to how you defined the SubscriberFunction type). I don't think you gain much by allowing non-'static values there (rather, you'd probably get other errors down the line) so the easy solution is to capture only 'static values. In this case you could clone self before creating f (it doesn't currently implement Clone, but its only field is just an Arc so it's trivially cheaply clonable so you can either clone that or just implement Clone).

By the way I suggest you not to read the errors from the VSCode interface, they tend to lose context since the errors can refer to multiple separate lines. If you run cargo check you will see much more clearly which lines of code are involved in your error and why.

2 Likes

I thought that because dyn Fn doesn't support clone within Box I mixed some thoughts, but, yes, Arc::clone should work within that closure. But cloning just self wasn't enough for the diagnostic to go out, then I ended up with:

let orig = Self { subscriber: Arc::clone(&self.subscriber) };

And invoked orig.subscribe instead of self.subscribe. I've also added these 'static bounds to each closure in the subscriber function alias... I think it's good

That's probably because:

  • derive(Clone) adds a T: Clone, Error: Clone bound on the implementation

  • in map T: Clone is not satisfied, so the Self type doesn't implement Clone

  • self however is a reference, and references do implement Clone

  • so you end up with a clone of the reference itself, which solves nothing because it isn't 'static and you wanted an owned clone of the Observable because that's 'static

The solution is to replace the derive(Clone) with a manual implementation with the right trait bounds:

impl<T, Error> Clone for Observable<T, Error>
where
    T: Send + Sync + 'static,
    Error: Send + Sync + 'static,
    // `derive(Clone)` would wrongly add these too, but you don't want them:
    // T: Clone,
    // Error: Clone,
{
    fn clone(&self) -> Self {
        Self {
            subscriber: Arc::clone(&self.subscriber)
        }
    }
}

As you can see the implementation is exactly the same as what you did, but now you can actually call it with .clone() because the trait bounds are not excessively constraining.

3 Likes