[Solved] Share a connection in HashSet (?)

I'm writing a service to broadcast some events to websocket clients. I use warp for it. It has a post http handler to handle signals from some services and a ws handler where end user's browsers are connected.

So here is the main idea. When a user's browser connects to the websocket handler, I create a mpsc channel and put the tx part to a shared global storage (based on Arc<RwLock<HashSet>>).

When an http request with some event comes to the http handler, it gets a read access to the global storage and iterate over the tx parts and send an event iterating over the all users.

The code is something like this:

async fn web_socket_handler(conn, storage) {
    let tx, rx = mpsc::channel();

    // with write lock
    storage.add(tx.clone());

    tokio::spawn({
          while rx { ... }
    })

    while conn {
    }

    // delete tx from storage
}

async fn http_handler(body, storage) {
    // with read lock
    for i in storage {
        i.send(body);
    }
}

So I have two questions:

  1. To add tx to the hashset storage I need Eq and Hash traits to be implemented, but I can't understand how to make it for the current tx type (mpsc::Sender)? Or may be I've chose an incorrect type for storing?
  2. Anyway after implementing 1 there is still a question how to delete this value from the hashset. Am I right that if non-cloned tx will return the same hash as cloned then I can do this:
async fn web_socket_handler(conn, storage) {
    let tx, rx = mpsc::channel();

    storage.add(tx.clone());
    ...
    storage.remove(tx);
}

So, putting mpsc channels in a collection is perfectly fine (though you might want to wrap it in a struct as described here), however it shouldn't be a HashSet. Consider using a hash map with IDs or a vector.

Thanks, I see. But for example to use HashMap I need somehow generate IDs anyway? Or in case of vectors I need somehow to delete the tx channel, but in which way? I can't use indexes because the storage length can be changed. What is another way to find the added value?

As for generating IDs, you can just use a counter. As for indexes in a vector, you can remove them like this:

let to_remove = Vec::new();
for (i, chan) in channels.iter_mut().enumerate() {
    if ... {
        to_remove.push(i);
    }
}
for i in to_remove.into_iter().rev() {
    channels.swap_remove(i);
}

The above code will store the indexes to remove, then remove them. Since the items are removed in reverse order, the IDs it still needs to remove are not moved around by previous iterations of the loop, so it doesn't remove the wrong things.

Haha, the most interesting part is in this if ... :slight_smile: If I clone the tx part how can I find it in the vector? Are tx and tx.clone equal?

I mean will this work?:

let tx, _ = mpsc::channel();
let storage = vec![];

storage.push(tx.clone())

for (i, chan) in channels.iter_mut().enumerate() {
    if chan == tx {
        to_remove.push(i);
    }
}

Also btw why here iter_mut is in use?

I think you misunderstood my code. The if ... is supposed to mean something like "here you try to send on the channel, and if the send fails, you remove it since that means the connection is gone". It's not supposed to mean "here we check if the channel is the one we are looking for". If you need to support that operation, then you should probably be using a hashmap with IDs instead, and then you just remove the ID directly instead of trying to compare the channels.

The iter_mut is in use because I guessed that sending on the channel requires mutable access to it. (I may be wrong.)

Ah, I see, probably it's not clear from the description about the life cycle of the elements. The storage is alive for the whole app lifecycle. The websocket handler can be run for a long time (while a user doesn't close the tab in their browser), and the web handler works only for the time of an incoming request. The storage is mutated in the websocket handler (adding when someone is connected and on disconnection removing channels). But in the http sender the service iterates (read access) over the storage to send the event across the channels. I don't know if it's clearer now :crossed_fingers:

That's more or less what I already guessed was the situation.

Ah, ok, I see, that the only way here is to use hashmaps with IDs. The only thing is how to generate them but that's another question, I think I'll manage with it. So thanks for your help :+1:

AtomicUsize is often good for generating ids.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.