In a method in the trait impl, I spawn a thread that will read from a socket and notify all subscribers:
let subscribers = self.subscribers.clone();
let handle: JoinHandle<String> = std::thread::spawn(move || loop {
let msg = socket.read().expect("Error reading message");
for subscriber in subscribers.lock().unwrap().iter() {
subscriber.0.send(msg.to_string()).unwrap();
}
});
This all works fine.
Of course, I had to use Arc to get around lifetime issues. (Mutex would be necessary regardless of lifetimes because there is multithreaded access to subscribers, from the subscribe method.)
My question is, is this idiomatic Rust? Is there a better way to do it?
Because, this style will result in a proliferation of Arc in the system - every trait object that has data that needs to be used in a channel thread will have to be wrapped in Arc. This doesn't feel right to me, but I'm still a Rust newbie and maybe it is.
std::sync::mpsc::Sender is already clonable, and clones will send to the same receiver as the original, so you could eliminate the Arc and Mutex and then do:
let senders = self.subscribers.iter().map(|(sender, _)| sender.clone()).collect::<Vec<_>>();
let handle: JoinHandle<String> = std::thread::spawn(move || loop {
let msg = socket.read().expect("Error reading message");
for sender in &senders {
sender.send(msg.to_string()).unwrap();
}
});
The growable collection behind a lock is not terrible, honestly. It is probably the easiest naive solution. But I would like to better understand why the collection contains sender-receiver pairs. Presumably these are separate halves of two different channels for bidirectional communication. Because the spawned thread does not use the receiver, it seems pointless to include it in the collection.
Aside, it looks like you are trying to create a broadcast channel. A crate like bus can do this. You will still need to put it behind Arc<Mutex> to dynamically add new subscribers. Any multithreaded collection will need these two elements (ref counting and synchronization) somewhere. Ref counting “extends the lifetime” when sharing across threads, because the referent is 'static.
A bespoke data structure could optimize the locking behavior in many ways. Some examples: Splitting the collection into fixed sized spans each with their own lock. Swapping the Mutex with RwLock. Using a linked list to grow the collection. Etc. writing concurrent data structures is hard, and I wouldn’t do it without loom at the very least, but it’s an option if you need to get performance characteristics that a single Mutex cannot provide.
It's not a broadcast mechanism - it's point-point.
I decided to store (sender, rx) because the data provider (this class) will use the sender half, the client will use the rx half, and when the client unsubscribes, the rx it passes can be used to find the tuple in the collection and remove it.
Is copying one message to multiple receivers not broadcast? What is your definition of broadcast? Or how would you define point-to-point in a way that makes the given for loop sensible? That for loop is precisely what a single-producer-multiple-consumer broadcast channel does.
The reason channels are split into halves is so they can be decoupled. If you are intent on coupling the channel halves, you could replace the pair with a VecDeque to hold messages. That is essentially what a channel is under the hood, it just gives you disjoint types to push_front and pop_back on a shared collection.
Unsubscribe can be done with a handle, which is a fancy way to say “index”, but the handle can have some of its own behaviors and can resist tampering. [1]
You're completely right regarding the above. Again, I failed to explicate completely.
It looks broadcasty now, but, in final form, each subscriber can choose the set of symbols it's interested in. IOW, the messages will vary by receiver.
(Of course, a design alternative would be to have receivers filter on their end. But that actually creates legal problems - it may be that certain subscribers are not entitled to market data from some providers. It's messy.)
In that case, it will almost certainly be better to just let the subscriber keep the Receiver. You can detect when sending that a receiver has been dropped and then drop the sender; this is more efficient and less error-prone.
Sorry; by “subscriber” I meant the part of the program which actually wants the information and will consume the strings, not your subscribers vector.
However, I like the cleanliness of using an explicit unsubscribe.
The problem with only explicit unsubscribes is that unforeseen conditions can mean that nobody remembers to unsubscribe, at which point you have a memory leak (because the subscribers vector grows and because the contents of each unread channel grows).
It's possible to have leaks with unsubscribe-on-drop, but harder for them to happen accidentally. And explicitly dropping the receiver forms an explicit unsubscribe any time you want.
Yes, that's what I meant too. (The subscribers collection here holds (sender, receiver) tuples, as noted, and the subscriber objects get a ref to the receiver when they subscribe.)