Subscriber maps and iterator invalidation

Hello!

I'm using the excellent capnproto for RPC, and I have both C++ and Rust implementations of a Pub/Sub scheme.

To do so, in Rust, I'm using something similar to what is presented in the Rust capnproto-rpc pub/sub example. That is, I have a SubscriberMap type that newtypes a HashMap<u64, SubscriberHandle>, and a Subscription type that, when dropped, remove the SubscriberHandle with the corresponding u64 from the SubscriberMap.

Basically (adapted from capnproto-rpc pub/sub example):

struct Subscriber { /* fields omitted */ }

struct SubscriberMap {
    // Map of unique_id => subscriber
    subscribers: HashMap<u64, Subscriber>,
}

// A handle to a subscription, to be kept by the subscriber.
// When the subscriber drops this handle, it is removed from the SubscriberMap
struct Subscription {
    id: u64,  // Unique id of the subscriber, same as in the map
    subscribers: Rc<RefCell<SubscriberMap>>, // Reference to the SubscriberMap
}

// Remove our subscriber from the map when subscription is dropped
impl Drop for Subscription {
    fn drop(&mut self) {
        self.subscribers.borrow_mut().subscribers.remove(&self.id);
    }
}

From there, the publisher can have a subscribe RPC method that take the client's Subscriber capability and return the Subscription capability to the client:

struct Publisher {
    next_id: u64, // generator of unique ids for subscribers
    subscribers: Rc<RefCell<SubscriberMap>>, // SubscriberMap to share with the subscriptions
}

impl Publisher {
    // simplified signature
    fn subscribe(&mut self,
                 client: Subscriber)
                 -> Subscription
    {
        self.subscribers.borrow_mut().subscribers.insert(
            self.next_id,
            client
        );
        self.next_id += 1;
        Subscription::new(self.next_id, self.subscribers.clone())))
    }

    fn publish(msg: &str) {
        // send msg to each Subscriber currently active in the map
    }
}

Note that all of this is single threaded, since capnproto operates on a single thread.

Now, as you can notice, this scheme uses a SubscriberMap that is shared between the Publisher and each Subscription by means of Rc<RefCell<_>>, so that the Subscription can remove the Subscriber from the map on drop, and the Publisher can add subscribers (and iterate them).

Now, my question resolves around the fact that I don't need a shared_ptr in the C++ code that is equivalent to the Rust code above. Basically, the trick is to use an ordered map rather than an unordered_map (the C++ equivalent of the HashMap), and count on the fact that in such a map iterators are only invalidated on removal, not when other elements are added.

Taking that into account, the C++ implementation looks like the following:

class Subscriber;

using SubscriberMap = std::map<std::size_t, Subscriber>;

class Subscription {
    SubscriberMap::iterator it_;
    SubscriberMap* subscribers_;

public:

    // lifetime(Subscription) < lifetime(SubscriberMap)
    explicit Subscription(SubscriberMap::iterator it, SubscriberMap& subscribers)
      : it_(it)
      , subscribers_(&subscribers)
    {}

    // Rule of 5 shenanigans
    Subscription(const Subscription&) = delete;
    Subscription(Subscription&& o); // Leave o empty to prevent double frees
    Subscription& operator=(const Subscription&) = delete;
    Subscription& operator=(Subscription&& o); // Leave o empty to prevent double frees

    ~Subscription() {
        if (not subscribers_) {
            return;
        }
        subscribers_->erase(it_);
    }
};

class Publisher {
    std::size_t next_id_ = 0;
    SubscriberMap subscribers_;

public:
    Subscription subscribe(Subscriber client) {
        auto id = ++next_id_;
        auto itbool = subscribers_->insert({id, std::move(client)});
        return Subscription { itbool.first, subscribers_ };
    }

    void publish(std::string_view msg) { /* iterate subscribers to send msg */ }
};

For this to be safe, I just need to make sure of three things:

  1. No Subscription can outlive a Publisher, because the Publisher owns the SubscriberMap, and lifetime(Subscription) < lifetime(SubscriberMap) must hold.
  2. No iterator contained in the Subscription can be invalidated during the lifetime of the Subscription. This holds because iterator invalidation only happens when we remove the pointed element from the map, and the only place we produce other iterators pointing to the same element is in the publish function (while iterating), but these iterators are not passed to erase in publish, and remain local to publish. Note that no two Subscriptions can have the same id.
  3. None of the iterators used in publish can be invalidated. This is fairly trivial, because these iterators are local to publish, and we are single threaded so publish and subscribe cannot be called concurrently.

So, looking at all this, there looks to be a small overhead in the Rust version compared to the C++ version. Also, conceptually, it doesn't feel right to me that the SubscriberMap be co-owned by all the Subscriptions, as I feel they should rather reference it.

Besides, in the Rust version we must perform an additional map lookup in the drop implementation, because we store the id instead of the iterator.

  1. Is there a way to eschew the Rc<RefCell<x>> from the Rust version?
  2. Is there a way to skip searching our id in the map at the moment we are removing the element, like in the C++ version?
  3. Is this doable without unsafe?
  4. Is this doable with a std collection? (I don't want to re-implement my own collection just so I can be sure of the "invalidation" properties)
  5. Is this doable in a number of lines comparable to the initial Rust/C++ implementations?

I feel that, because Rust lacks the notion of "operations that can invalidate iterators", and, most importantly, "operations that cannot invalidate iterators" we cannot express the problem like we did in the C++ version, but I'd love to be wrong!

Thank you for reading!

You can get rid of the Rc by letting Subscription hold a regular reference to SubscriberMap, and let the compiler enforce the lifetime restrictions:

struct SubscriberMap {
    subscribers: RefCell<HashMap<u64, Subscriber>>,
}

struct Subscription<‘map> {
    id: u64,
    subscribers: &’map SubscriberMap,
}

Also, depending on your data usage patterns, a Vec<Option<Subscriber>> might work better than Hashmap<u64,Subscriber>. You have to skip the removed subscribers while you’re iterating, but can calculate the location of a particular subscriber directly from its id. You can maintain a freelist of reusable ids to avoid runaway use of memory:

1 Like

I see; If I'm spending a lot of time iterating on my Subscribers, a Vec makes more sense, even if I have to manage "sparseness".

Having a reference instead of the Rc removes my main objection :-). I guess the RefCell is unavoidable (it always feel strange to "pay" for a boolean check when I am sure that I will always retrieve references inside of functions, where I know that the references won't escape)

If you can get away with preallocation for all your potential subscribers, you might consider storing RefCells inside your collection instead of a single one outside. Then, each subscription can hold a single active borrow for its entire lifetime at the expense of not beng able to alter the structure it’s contaned in.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.