I am writing a proxy multiplexer. Basicaly the idea is that theres a primary accepts socks5 proxy requests from clients and send the job to workers, then, workers connect to the remote site and sends the response back to the primary then primary sends it back to the client.
My worker and the primary talk over a udp socket using a simple wire protocol to serialize and distinguish the data which I know to be working because when there are sleep
s before the writes data integrity is fine.
client <--tcp--> my primary <--udp--> my worker <--tcp--> remote site
here's my send receive implementation
impl WorkerConn {
pub async fn recv(&self, mut buf: &mut [u8]) -> io::Result<usize> {
let a = self.sock.recv_from(&mut buf).await;
if let Ok((n, addr)) = a {
if addr != self.addr {
dbg!("unauthenticated connection from worker socket");
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"unauthenticated",
));
}
Ok(n)
} else {
Err(a.unwrap_err())
}
}
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
let res = self.sock.send_to(buf, self.addr).await;
//width this sleep things work seemingly fine at the cost of transfer speed
// sleep(Duration::from_millis(50)).await;
if let Ok(n) = res {
if buf.len() != n {
println!("not eq");
abort();
}
Ok(n)
} else {
res
}
}
}
this is how I read from the client and send to worker
loop {
if client_writer.upgrade().is_none() {
//client writer dropped stop upload
println!("client writer dropped");
break;
}
//client buf size 50kb
//worker buf size 100kb
let len = cr.read(&mut buf).await;
if len.is_err() {
conns.write().await.remove(&client_id);
return;
}
let len = len.unwrap_or(0) as u16;
if len == 0 {
conns.write().await.remove(&client_id);
return;
}
let cmd_msg: Vec<u8> = concat![
&[Cmd::Data.into()],
client_id.as_bytes(),
&len.to_be_bytes(),
&buf[..len as usize]
];
//CmdData | client_id | length | data |
// 1octet | 32octets | 2octets | n octets | 35+
let _ = worker.send(&cmd_msg).await;
}
and this if from worker to client
Cmd::Data => {
//download
let client = clients
.read()
.await
.get(parts.client_id().to_string().as_str())
.cloned();
if client.is_none() {
let _ = worker
.send(&concat!(&[Cmd::End.into()], parts.client_id_bytes()))
.await;
return Some(parts);
}
let client = client.unwrap();
{
let worker = worker.clone();
let end_data: Vec<u8> = concat!(&[Cmd::End.into()], parts.client_id_bytes());
let data = parts.data().to_vec();
dbg!("download", data.len());
tokio::spawn(async move {
//client write_all might take forever it should not effect the rest of the
//users
let mut client = client.lock().await;
select! {
r = client.write_all(&data) =>{
if let Err(_) = r{
let _ = worker.send(&end_data).await;
}
},
_=sleep(Duration::from_secs(3))=>{
let _ = worker.send(&end_data).await;
}
}
});
}
}
on the worker side
from primary to remote
Cmd::Data => {
parts.data().len();
if let Some(remote) = conns
.read()
.await
.get(parts.client_id().to_string().as_str())
.cloned()
{
let data = parts.data().to_vec();
tokio::spawn(async move {
//upload
let _ = remote.lock().await.write_all(&data).await;
});
} else {
let _ = sock
.send(&concat!(&[Cmd::End.into()], parts.client_id_bytes()))
.await;
}
}
and from remote to primary
//remote buf size 50kb
//worker buf size 100kb
while let Ok(n) = reader.read(&mut buf).await {
if write_ref.upgrade().is_none() {
//write end dropped
println!("write handle dropped killing");
break;
}
//download
if n == 0 {
break;
}
let _ = proxy
.send(&concat!(
&[Cmd::Data.into()],
&conn_id,
&(n as u16).to_be_bytes(),
&buf[..n]
))
.await;
// sleep(Duration::from_millis(10)).await;
}
//send end
let _ = proxy.send(&concat!(&[Cmd::End.into()], &conn_id)).await;
It feels like I am missing some point but cant put my finger on it.
Any help including paid support will be appreciated I am struggling with this for a week by now.
Also I am pretty confident that my protocol serialization/deserialization works fine because as I mentioned when I add some 50ms sleep things work fine.