How to make this code more flexible/generic?

This following code compiles, but only if I stick to the same type in the Observable<T>

trait Observable<T> {}

struct BaseObservable<T>(T);

struct OtherObservable<T>(T);

impl<T> Observable<T> for BaseObservable<T> {}

impl<T> Observable<T> for OtherObservable<T> {}

struct Subscriber<N> {
    output_observable: Box<dyn Observable<N>>,
}

impl<N: 'static> Subscriber<N> {
    fn new(v: N) -> Self {
        Subscriber {
            output_observable: Box::new(BaseObservable(v)),
        }
    }
    
    fn set(&mut self, v: impl Observable<N> + 'static) {
        self.output_observable = Box::new(v);
    }
}


fn main() {
    let mut s = Subscriber::new(8); // start with integer
    s.set(OtherObservable(9));      // can continue only with integers from now on
}

I would like to have something like this:

fn main() {
    let mut s = Subscriber::new(8); // start with integer
    s.set(OtherObservable('n'));     // continue with char or any type not just integer
}

I know where this restriction is, it is in the definition of Subscriber<N>

struct Subscriber<N> {
    output_observable: Box<dyn Observable<N>>,
}

where generic N is propagated further to dynamic trait object, but I don't know how to edit this to break from N, requirement being that trait Observable<T> keeps it's signature with generic.

I have tried with adding more generics, function generics, playing with std::marker::Any but with no success.

What are you trying to achieve with this? How should a subscriber of one type behave when you set an observer of a mismatching type? Should it be ignored? Should it blow up? Are you trying to go full dynamic? In that case, you should probably not bother with a generic type parameter in the first place.

Code is a smaller version of a bigger one. set() method here is just a test and does not exist in the original code.

Basically idea is that user can instantiate Subscriber<N> object and call subscribe(Observer) give it Observer<T> object which in turn has next(T) method that would accept any type.

However to add functionality to alter the stream there would be also a pipe(mut self, *** Vector of Observable factory functions **) method on Subscriber where user before calling subscribe calls pipe gives it a Vector of transformation Observables e.g. MapObservable and/or FilterObservable.

To be more accurate Subscriber.pipe() takes Vector of callbacks that each takes Observable and returns Observable, it is just used for wrapping Observables within one another.

pipe() method wraps Observable-0 with in Observable-1 and so on until it goes trough all observables in a Vector.

After pipe, Subscriber has final output Observable as its base.
There is also a subscribe(observer) method with in Observable.
So Subscriber.subscribe(observer) calls base Observable.subscribe(observer) which in turn calls its wrapped inner Observable.subscribe() and so on until it reaches root.

Each call upstream to Observable.subscribe(Observer<T>) also wraps Observer to add extra functionality.

// Just a global helper function to create Observer
pub fn observe<N>(next_fnc: impl FnMut(N) + 'static) -> Observer<N> {
    Observer {
        next_fn: Box::new(next_fnc)
    }
}

This part would be MapObservable (just called Map here) implementation, global map can be turned into MapObservable::new()

pub struct Map<T: Observable<V>, V, R> {
    inner_observable: T,
    transform_fn: fn(V) -> R,
}

pub fn map<T: Observable<V>, V, R>(mapfn: fn(V) -> R) -> impl FnMut(T) -> Map<T, V, R> {
     move |o: T| -> Map<T, V, R> {
        Map { inner_observable: o, transform_fn: mapfn, }
    }
}

impl<T: Observable<V>, V: 'static, R: 'static> Observable<R> for Map<T, V, R> {

    fn subscribe(self, mut observer: Obs<R>) -> Subject {

        let map_observer = observe(move |i: V| {
            let v = (self.transform_fn)(i);
            observer.next(v);

        });
        self.inner_observable.subscribe(map_observer)
    }
}

It should remove old Observable put it in the new one, this is pipe implementation on Subscriber:

impl<N, O: Observable<N>> Subscriber<N, O> { 
    pub fn pipe(mut self, observables: &mut [impl FnMut(O) -> O]) -> Self {
        let mut o = self.output_observable.unwrap();  // In the original code output_observable is in the Option::Some<T>

        for i in observables {
            o = i(o);
        }
        
        self.output_observable = Some(o);
        self
    }
}

Sorry that I couldn't be more concise but that is the general idea.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.