Sending Tokio UdpSocket via Channel

Hi,

i want to develop a UDP Proxy with Rust.
The proxy should listen for incoming connections on a specific port and forward the received datagrams to a remote target.
While this is actually trivial to implement, the proxy should have a speciality and maintain sessions. Here is where it becomes difficult.

Datagrams from each client should be forwarded to the target with a (temporarily) unique source port on the proxy side.
Example:
192.168.1.2 -> 192.168.1.10:10000 (Proxy) 192.168.1.10:56789 -> 123.123.123.123:12345
192.168.1.3 -> 192.168.1.10:10000 (Proxy) 192.168.1.10:56790 -> 123.123.123.123:12345
192.168.1.4 -> 192.168.1.10:10000 (Proxy) 192.168.1.10:56791 -> 123.123.123.123:12345

The outgoing ports on the proxy side can be assigned randomly, but should (as stated above) not change for a client.
If a client has not sent datagrams for e.g. 5 seconds, his "port reservation" can be discarded again.

To allow the proxy to send from different source ports in parallel, one socket has to be allocated and bound per active client.

My first idea was to implement a connection manager tokio task to manage the client-to-proxy-socket relation using a hashmap:

    let (cm_cmd_tx, mut cm_cmd_rx) = mpsc::channel::<(SocketAddr, tokio::sync::oneshot::Sender<&Arc<UdpSocket>>)>(100); // connection_manager_command_channel
    let mut connections = HashMap::<SocketAddr, Arc<UdpSocket>>::new();

    let connection_manager = task::spawn(async move {
        loop {
            if let Some((addr, response_channel)) = cm_cmd_rx.recv().await {
                match connections.get(&addr) {
                    None => {
                        connections.insert(addr.clone(), Arc::new(UdpSocket::bind(ss_local_addr).await.unwrap()));
                        // #TODO Start timer here to delete the socket again after a few seconds...
                    }
                    Some(socket) => {
                        // #TODO Reset timer again, as the client is still active...
                        match response_channel.send(socket) {
                            Ok(_) => {}
                            Err(_) => {}
                        }
                    }
                }
            }
        }
    });

However i do not know how to share the tokio UdpSocket via the response_channel. I get the following error:

error[E0597]: `connections` does not live long enough
  --> proxy\src\main.rs:42:23
   |
36 |     let (cm_cmd_tx, mut cm_cmd_rx) = mpsc::channel::<(SocketAddr, tokio::sync::oneshot::Sender<&Arc<UdpSocket>>)>(100); // connection_man...
   |                     ------------- lifetime `'1` appears in the type of `cm_cmd_rx`
...
42 |                 match connections.get(&addr) {
   |                       ^^^^^^^^^^^^^^^^^^^^^^ borrowed value does not live long enough
...
49 |                         match response_channel.send(socket) {
   |                               ----------------------------- argument requires that `connections` is borrowed for `'1`
...
57 |     });
   |     - `connections` dropped here while still borrowed

How would you implement such a connection manager? Would you send the sockets by reference, or by value? How would this be implemented?

Thanks for your support. If you require more information, please let me know!

You need to send an Arc<UdpSocket>, not a reference to it. Arc is already a smart pointer and will take care of "referencing".

How would this look like? Imagine:

let (cm_cmd_tx, mut cm_cmd_rx) = mpsc::channel::<(SocketAddr, tokio::sync::oneshot::Sender<Arc<UdpSocket>>)>(100); // connection_manager_command_channel
    let mut connections = HashMap::<SocketAddr, Arc<UdpSocket>>::new();

    let connection_manager = task::spawn(async move {
        loop {
            if let Some((addr, response_channel)) = cm_cmd_rx.recv().await {
                match connections.get(&addr) {
                    None => {
                        connections.insert(addr.clone(), Arc::new(UdpSocket::bind(ss_local_addr).await.unwrap()));
                        // #TODO Start timer here to delete the socket again after a few seconds...
                    }
                    Some(socket) => {
                        // #TODO Reset timer again, as the client is still active...
                        match response_channel.send(*socket) {
                            Ok(_) => {}
                            Err(_) => {}
                        }
                    }
                }
            }
        }
    });

This will result in an error, as a Socket is not 'Copy':

error[E0507]: cannot move out of `*socket` which is behind a shared reference
  --> proxy\src\main.rs:49:53
   |
49 |                         match response_channel.send(*socket) {
   |                                                     ^^^^^^^ move occurs because `*socket` has type `Arc<tokio::net::UdpSocket>`, which does not implement the `Copy` trait


No, that's not what the error says. The error says that an Arc<UdpSocket> is not Copy. That it is wrapped in an Arc is important. To fix it, simply clone the Arc.

Thanks @alice! You are right! I was blind!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.