I've tried converting a TC39 proposal's reference implementation in Rust from their code and got it compiling, but I want to add some missing functional methods such as map
and filter
(or called retain in Rust). Here's Observer
(which implements the AbstractObserver
trait):
pub struct Observer<T, Error = ()>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
/// Receives the next value in the sequence.
pub next: Box<dyn Fn(T) + Sync + Send>,
/// Receives the sequence error.
pub error: Box<dyn Fn(Error) + Sync + Send>,
/// Receives a completion notification.
pub complete: Box<dyn Fn() + Sync + Send>,
/// Receives the subscription object when `subscribe` is called.
pub start: Option<Box<dyn Fn(Arc<Subscription<T, Error>>) + Sync + Send>>,
}
Right now I'm trying to add firstly map
and filter
, but inside their inner subscription I need to capture the subscription observer:
impl<T, Error> Observable<T, Error>
where
T: Send + Sync + 'static,
Error: Send + Sync + 'static
{
/// Returns a new `Observable` that performs a map on data from the original.
pub fn map<U>(&self, map_fn: impl Fn(T) -> U + Send + Sync + 'static) -> Observable<U, Error>
where
T: Clone,
U: Send + Sync + 'static
{
let f: SubscriberFunction<U, Error> = Arc::new(move |observer| {
let observer = Arc::new(observer);
let (observer_1, observer_2, observer_3) = (Arc::clone(&observer), Arc::clone(&observer), Arc::clone(&observer));
let subscription = self.subscribe(observer! {
next: move |value: T| {
observer_1.next(map_fn(value.clone()));
},
error: move |error| {
observer_2.error(error);
},
complete: move || {
observer_3.complete();
},
});
Arc::new(move || {
subscription.unsubscribe();
})
});
Observable::<U, Error>::new(f)
}
}
The problem is, when I use the move
prefix in a closure, it ends up implementing the Copy
trait. It looks like I can't add Copy
as a bound to my function types, so I'm lost on what to do.
Here's the source (uses std
only):
(P.S.: they are not "iterators", that's why I don't use Iterator
)
To clarify, here's what I'm getting:
let subscription = self.subscribe(observer! {
next: move |value: T| {
// ^^^^^^^^^^^^^^^
// cannot move out of `map_fn`, a captured variable in an `Fn` closure