Hello!
I'm using the excellent capnproto
for RPC, and I have both C++
and Rust implementations of a Pub/Sub scheme.
To do so, in Rust, I'm using something similar to what is presented in the Rust capnproto-rpc
pub/sub example. That is, I have a SubscriberMap
type that newtypes a HashMap<u64, SubscriberHandle>
, and a Subscription
type that, when dropped, remove the SubscriberHandle
with the corresponding u64
from the SubscriberMap
.
Basically (adapted from capnproto-rpc
pub/sub example):
struct Subscriber { /* fields omitted */ }
struct SubscriberMap {
// Map of unique_id => subscriber
subscribers: HashMap<u64, Subscriber>,
}
// A handle to a subscription, to be kept by the subscriber.
// When the subscriber drops this handle, it is removed from the SubscriberMap
struct Subscription {
id: u64, // Unique id of the subscriber, same as in the map
subscribers: Rc<RefCell<SubscriberMap>>, // Reference to the SubscriberMap
}
// Remove our subscriber from the map when subscription is dropped
impl Drop for Subscription {
fn drop(&mut self) {
self.subscribers.borrow_mut().subscribers.remove(&self.id);
}
}
From there, the publisher can have a subscribe
RPC method that take the client's Subscriber capability and return the Subscription capability to the client:
struct Publisher {
next_id: u64, // generator of unique ids for subscribers
subscribers: Rc<RefCell<SubscriberMap>>, // SubscriberMap to share with the subscriptions
}
impl Publisher {
// simplified signature
fn subscribe(&mut self,
client: Subscriber)
-> Subscription
{
self.subscribers.borrow_mut().subscribers.insert(
self.next_id,
client
);
self.next_id += 1;
Subscription::new(self.next_id, self.subscribers.clone())))
}
fn publish(msg: &str) {
// send msg to each Subscriber currently active in the map
}
}
Note that all of this is single threaded, since capnproto
operates on a single thread.
Now, as you can notice, this scheme uses a SubscriberMap
that is shared between the Publisher
and each Subscription
by means of Rc<RefCell<_>>
, so that the Subscription
can remove the Subscriber
from the map on drop, and the Publisher
can add subscribers (and iterate them).
Now, my question resolves around the fact that I don't need a shared_ptr
in the C++
code that is equivalent to the Rust code above. Basically, the trick is to use an ordered map
rather than an unordered_map
(the C++
equivalent of the HashMap
), and count on the fact that in such a map iterators are only invalidated on removal, not when other elements are added.
Taking that into account, the C++
implementation looks like the following:
class Subscriber;
using SubscriberMap = std::map<std::size_t, Subscriber>;
class Subscription {
SubscriberMap::iterator it_;
SubscriberMap* subscribers_;
public:
// lifetime(Subscription) < lifetime(SubscriberMap)
explicit Subscription(SubscriberMap::iterator it, SubscriberMap& subscribers)
: it_(it)
, subscribers_(&subscribers)
{}
// Rule of 5 shenanigans
Subscription(const Subscription&) = delete;
Subscription(Subscription&& o); // Leave o empty to prevent double frees
Subscription& operator=(const Subscription&) = delete;
Subscription& operator=(Subscription&& o); // Leave o empty to prevent double frees
~Subscription() {
if (not subscribers_) {
return;
}
subscribers_->erase(it_);
}
};
class Publisher {
std::size_t next_id_ = 0;
SubscriberMap subscribers_;
public:
Subscription subscribe(Subscriber client) {
auto id = ++next_id_;
auto itbool = subscribers_->insert({id, std::move(client)});
return Subscription { itbool.first, subscribers_ };
}
void publish(std::string_view msg) { /* iterate subscribers to send msg */ }
};
For this to be safe, I just need to make sure of three things:
- No
Subscription
can outlive aPublisher
, because thePublisher
owns theSubscriberMap
, andlifetime(Subscription) < lifetime(SubscriberMap)
must hold. - No iterator contained in the
Subscription
can be invalidated during the lifetime of theSubscription
. This holds because iterator invalidation only happens when we remove the pointed element from the map, and the only place we produce other iterators pointing to the same element is in thepublish
function (while iterating), but these iterators are not passed toerase
inpublish
, and remain local topublish
. Note that no twoSubscription
s can have the same id. - None of the iterators used in
publish
can be invalidated. This is fairly trivial, because these iterators are local topublish
, and we are single threaded sopublish
andsubscribe
cannot be called concurrently.
So, looking at all this, there looks to be a small overhead in the Rust version compared to the C++
version. Also, conceptually, it doesn't feel right to me that the SubscriberMap
be co-owned by all the Subscription
s, as I feel they should rather reference it.
Besides, in the Rust version we must perform an additional map lookup in the drop implementation, because we store the id instead of the iterator.
- Is there a way to eschew the
Rc<RefCell<x>>
from the Rust version? - Is there a way to skip searching our id in the map at the moment we are removing the element, like in the
C++
version? - Is this doable without
unsafe
? - Is this doable with a
std
collection? (I don't want to re-implement my own collection just so I can be sure of the "invalidation" properties) - Is this doable in a number of lines comparable to the initial Rust/
C++
implementations?
I feel that, because Rust lacks the notion of "operations that can invalidate iterators", and, most importantly, "operations that cannot invalidate iterators" we cannot express the problem like we did in the C++
version, but I'd love to be wrong!
Thank you for reading!