Using STUN Round-trip time to compute latency of a p2p connection

use std::{
    collections::HashMap,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc, Mutex,
    },
    thread::sleep,
    time::{Duration, Instant},
};

use tokio::{
    net::UdpSocket,
    sync::{
        mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
        oneshot::{channel, Receiver, Sender},
    },
};

use rand::{rngs::StdRng, RngCore, SeedableRng};

trait StunPacketTrait<const N: usize> {
    fn transaction_id(&self) -> &[u8];
}

impl<const N: usize> StunPacketTrait<N> for [u8; N] {
    fn transaction_id(&self) -> &[u8] {
        &self[8..][..12]
    }
}
struct StunPacket {
    packet: [u8; 20],
}

impl StunPacket {
    fn new() -> StunPacket {
        let mut packet: [u8; 20] = [0; 20];
        // binding request
        packet[0] = 0x00;
        packet[1] = 0x01;
        // attribute
        packet[2] = 0x00;
        packet[3] = 0x00;
        // magic cookie
        packet[4..8].copy_from_slice(&[0x21, 0x12, 0xA4, 0x42]);
        // transaction id
        let mut rng = StdRng::from_entropy();
        rng.fill_bytes(&mut packet[8..]);

        StunPacket { packet }
    }

    fn transaction_id(&self) -> &[u8] {
        self.packet.transaction_id()
    }
}




struct Peer {
    remote_socket: String,
    cooldown: Duration,
    stop_rx: Receiver<()>,
    result_tx: UnboundedSender<Duration>,
}

impl Peer {
    fn new(
        remote_socket: String,
        cooldown: Option<Duration>,
    ) -> (Self, Sender<()>, UnboundedReceiver<Duration>) {
        let (stop_tx, stop_rx) = channel::<()>();
        let (result_tx, result_rx) = unbounded_channel::<Duration>();
        let peer = Peer {
            remote_socket,
            cooldown: cooldown.unwrap_or(Duration::from_millis(400)),
            stop_rx,
            result_tx,
        };
        (peer, stop_tx, result_rx)
    }

    async fn monitor(&mut self) {
        let run = Arc::new(AtomicBool::new(true));
        if self.stop_rx.try_recv().is_ok() {
            run.store(false, Ordering::SeqCst);
        }

        let send_times = Arc::new(Mutex::new(HashMap::new()));
        let local_socket = UdpSocket::bind("0.0.0.0:0")
            .await
            .expect("Error binding to local socket");
        let sender_jh = async {
            while run.load(Ordering::SeqCst) {
                // println!("Sending STUN to {}", self.remote_socket);
                let stun = StunPacket::new();
                send_times
                    .lock()
                    .expect("Unable to lock mutex for in send loop")
                    .insert(stun.transaction_id().to_owned(), Instant::now());
                local_socket
                    .send_to(&stun.packet, &self.remote_socket)
                    .await
                    .expect("Error sending stun packet");
                tokio::time::sleep(self.cooldown).await; // Cooldown
            }
        };

        let listener_jh = async {
            while run.load(Ordering::SeqCst) {
                // println!("Listening for STUN from {}", self.remote_socket);
                let mut buf = [0; 128];
                match local_socket.recv_from(&mut buf).await {
                    Ok((len, _)) if len >= 20 => {
                        if let Some(send_time) = send_times.lock().unwrap().remove(buf.transaction_id())
                        {
                            // println!("Packet recognized!");
                            let stun_rtt = send_time.elapsed();
                            self.result_tx.send(stun_rtt).expect("Error sending rtt");
                        } else {
                            // println!("Unrecognized transaction ID");
                        }
                    }
                    Ok(_) => {
                        // println!("Packet ignored");
                    }
                    Err(_e) => {
                        // println!("Data not received: {}", _e);
                    }
                }
                tokio::time::sleep(self.cooldown).await; // Cooldown
            }
        };

        tokio::join!(sender_jh, listener_jh);
    }
}

#[tokio::main]
async fn main() {
    let remote_socket = "stun.cope.es:3478".to_string();
    let (mut peer, stop_tx, mut duration_rx) = Peer::new(remote_socket.to_owned(), None);

    tokio::task::spawn(async move {
        peer.monitor().await;
    });

    tokio::task::spawn(async move {
        while let Some(latency) = duration_rx.recv().await {
            println!("Current latency with {} is: {:.2?}", remote_socket, latency);
        }
    });

    sleep(Duration::from_millis(6000)); // timeout
    stop_tx.send(()).unwrap();
}

This code will be used to monitor current latency packet loss with players connected via steam lobby in multiplayer pvp games. I am not using ICMP echo requests in fear of many NAT devices blocking these requests by default.

I'm sending empty stun packets and storing the send times in a hashtable with the key being the transaction id.

I am not happy that the constructor emits two channels along with the Peer object. If I could find a way to listen to durations and send stop signal via struct methods, it'd be nicer.

Github gist: Send and receive STUN packets to compute p2p latency ยท GitHub

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.