Fast writes to tokio udpsocket seems to break data integrity

I am writing a proxy multiplexer. Basicaly the idea is that theres a primary accepts socks5 proxy requests from clients and send the job to workers, then, workers connect to the remote site and sends the response back to the primary then primary sends it back to the client.
My worker and the primary talk over a udp socket using a simple wire protocol to serialize and distinguish the data which I know to be working because when there are sleeps before the writes data integrity is fine.

client <--tcp--> my primary <--udp--> my worker <--tcp--> remote site

here's my send receive implementation

impl WorkerConn {
    pub async fn recv(&self, mut buf: &mut [u8]) -> io::Result<usize> {
        let a = self.sock.recv_from(&mut buf).await;
        if let Ok((n, addr)) = a {
            if addr != self.addr {
                dbg!("unauthenticated connection from worker socket");
                return Err(io::Error::new(
                    io::ErrorKind::NotConnected,
                    "unauthenticated",
                ));
            }
            Ok(n)
        } else {
            Err(a.unwrap_err())
        }
    }
    pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
        let res = self.sock.send_to(buf, self.addr).await;
//width this sleep things work seemingly fine at the cost of transfer speed
        // sleep(Duration::from_millis(50)).await; 
        if let Ok(n) = res {
            if buf.len() != n {
                println!("not eq");
                abort();
            }
            Ok(n)
        } else {
            res
        }
    }
}

this is how I read from the client and send to worker

            loop {
                if client_writer.upgrade().is_none() {
                    //client writer dropped stop upload
                    println!("client writer dropped");
                    break;
                }
//client buf size 50kb
//worker buf size 100kb

                let len = cr.read(&mut buf).await;
                if len.is_err() {
                    conns.write().await.remove(&client_id);
                    return;
                }
                let len = len.unwrap_or(0) as u16;
                if len == 0 {
                    conns.write().await.remove(&client_id);
                    return;
                }
                let cmd_msg: Vec<u8> = concat![
                    &[Cmd::Data.into()],
                    client_id.as_bytes(),
                    &len.to_be_bytes(),
                    &buf[..len as usize]
                ];
                //CmdData   |  client_id   |    length   |   data       |
                // 1octet   |   32octets   |    2octets  |  n octets    | 35+
                let _ = worker.send(&cmd_msg).await;
            }

and this if from worker to client

        Cmd::Data => {
            //download
            let client = clients
                .read()
                .await
                .get(parts.client_id().to_string().as_str())
                .cloned();
            if client.is_none() {
                let _ = worker
                    .send(&concat!(&[Cmd::End.into()], parts.client_id_bytes()))
                    .await;
                return Some(parts);
            }
            let client = client.unwrap();
            {
                let worker = worker.clone();
                let end_data: Vec<u8> = concat!(&[Cmd::End.into()], parts.client_id_bytes());
                let data = parts.data().to_vec();
                dbg!("download", data.len());
                tokio::spawn(async move {
                    //client write_all might take forever it should not effect the rest of the
                    //users
                    let mut client = client.lock().await;
                    select! {
                        r = client.write_all(&data) =>{
                            if let Err(_) = r{
                                let _ = worker.send(&end_data).await;
                            }
                        },
                        _=sleep(Duration::from_secs(3))=>{
                                let _ = worker.send(&end_data).await;
                        }
                    }
                });
            }
        }

on the worker side

from primary to remote

            Cmd::Data => {
                parts.data().len();
                if let Some(remote) = conns
                    .read()
                    .await
                    .get(parts.client_id().to_string().as_str())
                    .cloned()
                {
                    let data = parts.data().to_vec();
                    tokio::spawn(async move {
                        //upload
                        let _ = remote.lock().await.write_all(&data).await;
                    });
                } else {
                    let _ = sock
                        .send(&concat!(&[Cmd::End.into()], parts.client_id_bytes()))
                        .await;
                }
            }

and from remote to primary

//remote buf size 50kb
//worker buf size 100kb

        while let Ok(n) = reader.read(&mut buf).await {
            if write_ref.upgrade().is_none() {
                //write end dropped
                println!("write handle dropped killing");
                break;
            }
            //download
            if n == 0 {
                break;
            }
            let _ = proxy
                .send(&concat!(
                    &[Cmd::Data.into()],
                    &conn_id,
                    &(n as u16).to_be_bytes(),
                    &buf[..n]
                ))
                .await;
            // sleep(Duration::from_millis(10)).await;
        }
        //send end
        let _ = proxy.send(&concat!(&[Cmd::End.into()], &conn_id)).await;

It feels like I am missing some point but cant put my finger on it.
Any help including paid support will be appreciated I am struggling with this for a week by now.
Also I am pretty confident that my protocol serialization/deserialization works fine because as I mentioned when I add some 50ms sleep things work fine.

UDP has few guarantees about anything. Packets can be in any order and duplicated or missing.

1 Like

I am using udp here because of the framing support.
What would you suggest to do in this case?

I would continue using TCP.

1 Like

I have a previous version using tcp but it is really hard multiplexing connections over a single connection on tcpevery now and then some bytes slip and the entire connection becomes bad which costs tens of reconnections

You can get framing on TCP by using tokio_util::codec. The built-in LengthDelimitedCodec can do it rather easily if your frames are just byte chunks.

1 Like

Hm. Can you use SCTP?

1 Like

that is interesting I will look into thanks :slight_smile:

my frames have a header and variable length

The codec module can handle variable length frames no problem. It's designed for that.

1 Like

I will look both into codec and stcp which looks interesting. Thanks @alice @s3bk I will comment here later on when things work smoothly and stable

It might also be worthwhile to look into the QUIC protocol: QUIC - Wikipedia

1 Like

@alice @s3bk I looked into both sctp and codec sctp looks amazing however there is no production ready tokio implementation.
About codec it seems it is what I need but then I realized I already have the parsing and serializing methods ready just wrote a recv_tcp function. So far things seem to be really smooth so far thank you both for the ideas.

1 Like