[Newbie] Having difficulties with lifetime bounds

Hi, I'm trying to recreate reactive-streams interfaces with rust traits, but I have some problems with lifetime bounds. Consider the following traits:

pub trait Subscription: Debug + Send + Sync {
  fn request(&self, n: usize);
  fn cancel(&self);
}

pub trait Subscriber<'a, T>: Debug + Send + Sync {
  fn on_subscribe<S: Subscription>(&self, subscription: &'a S);
  fn on_next(&self, value: &T);
  fn on_error<E: Error>(&self, error: &E);
  fn on_complete(&self);
}

pub trait Publisher<'a, T>: Debug + Send + Sync {
  fn subscribe<S: Subscriber<'a, T>>(&self, subscriber: &'a S);
}

When I try to implement a simple publisher(e.g. ArrayPublisher) I got some errors:

#[derive(Debug)]
struct ArrayPublisher<T> {
  array: Vec<T>,
}

impl<'a, T: Debug + Clone + Send + Sync + 'a> Publisher<'a, T> for ArrayPublisher<T> {
  fn subscribe<S: Subscriber<'a, T>>(&self, subscriber: &'a S) {
    let subscription = ArraySubscription::new(self.array.to_vec(), subscriber);
    subscriber.on_subscribe(&subscription);
  }
}

#[derive(Debug, Clone)]
pub struct ArraySubscription<'a, T, S: Subscriber<'a, T>> {
  array: Vec<T>,
  subscriber: &'a S,
}

impl<'a, T: Debug, S: Subscriber<'a, T>> ArraySubscription<'a, T, S> {
  pub fn new(array: Vec<T>, subscriber: &'a S) -> Self {
    ArraySubscription { array, subscriber }
  }
}

impl<'a, T: Debug + Send + Sync, S: Subscriber<'a, T>> Subscription for ArraySubscription<'a, T, S> {
  fn request(&self, n: usize) {
    for i in 0..n {
      self.subscriber.on_next(&self.array[i]);
    }
  }
  
  fn cancel(&self) {
    unimplemented!();
  }
}

The problem is that subscription variable doesn't live long enough(for 'a lifetime), and of course this make sense, since 'a is declared in impl block which lives more than the subscribe fn. My guess is to use something like Higher-Rank Trait Bounds, but actually I don't know how. I want to be really generic here, since every publisher can have different lifetimes and implementations, any suggestions?

I'm having a little bit of trouble following how this is supposed to operate. At the very least, you're going to want to do some type erasure somewhere with something like Box<dyn Subscription + 'a> or Rc<dyn Subscription 'a> so that publishers and/or subscribers can hold multiple implementations of subscription in the same container.

Can you describe a typical example of how you want to use this system after it's been implemented? That would help a lot with giving more concrete advice.


Edit: Attempting to understand the docs of the original implementation:

  • Anything implementing the trait Provider<T> is a source of events of type T. This trait provides one method, subscribe(s), which registers Subscriber s to receive these events.

  • During this method call, a provider-specific implementation of Subscription is given to s, which is used for flow control. It provides two methods, one to request the next n events to be provided and another to cancel the subscription.

  • Also inside subscribe(), the provider needs to store some kind of reference to the subscriber so that it can call next(), on_complete(), and on_error() when appropriate.

It's unclear whether providers are meant to outlive associated subscibers or vice versa, which will have strong implications for the API design in Rust wrt. lifetimes. If there's not supposed to be a strong relationship here, you may need to use Rc or Arc references instead of lifetime-annotated ones.

1 Like

Suggestion is not to copy Java.
A good possible starting point is to write test(s) that runs the actions you desire. Without massive amount time studying it is hard to know what is trying to be achieved with your traits.

Yeah, after reading your response, I achieved my desired result by using Arc, this makes a lot of sense. Also this makes the impl a lot easier, thanks for the help

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.