Spam-sending packets through websockets causes some packets to drop

Similar to r2d2, I created a connection pool below called pool that stores an Addr<ClientSessionVehicle? that has a handler (impl Message) for BytesWrapper that, on actor reception, the bytes are wrote to the underlying websocket stream. I needed to lock-down the calling closure (i.e., try_send to prevent two writes at once otherwise many of the packets would not even get sent (1/12 packets gets received by the server; especially, the last of the 12 sent gets received, but the first 11 dont get sent). By using the lock() mechanism, 10/12 packets sent consecutively in a for loop (by calling try_send). When I add the sleep() mechanism, all 12/12 packets get sent. What's happening that's forcing me to due funky stuff just to allow consecutive sends of packets? Does this have to do with TCP_NODELAY or something similar? A TCP timer maybe? This is strange, but it does make sense because nagel's algorithm is used to prevent spam on a network, but maybe it's over-doing it here?

#[derive(Clone, Debug)]
pub struct SkyscraperClient {
    pool: Arc<Vec<Addr<ClientSessionVehicle>>>,
    lock: Arc<Mutex<()>>,
    index: usize
}

impl SkyscraperClient {
    /// This will panic if a connection cannot be established
    pub async fn new(skyscraper_socket_addr: &'static str, skyscraper_url_connect: &'static str, pool_count: usize) -> Self {
        let mut pool = Vec::with_capacity(pool_count);
        for _ in 0..pool_count {
            let client = ClientContainer::connect_to_client(
                skyscraper_socket_addr,
                skyscraper_url_connect,
                None,
            ).await.unwrap();
            pool.push(client);
        }

        Self {pool: Arc::new(pool), lock: Arc::new(Mutex::new(())), index: 0}
    }

    /// This will ensure that one send occurs at a time by locking the entire closure.
    /// This drops the closure-lock at the end of the function
    pub fn try_send(&mut self, packet: BytesWrapper) -> Result<(), ()> {
        // place a lock
        let lock = self.lock.lock()?;
        if self.index == self.pool.len() {
            // reset
            self.index = 0;
        }

        self.pool.get(self.index)?.try_send(packet)?;
        self.index += 1;
        // std::thread::sleep_ms(100);
        Ok(())
    }

    pub fn get_inner(&self) -> SkyscraperClient {
        self.clone()
    }
}

This is how I'm calling it:

        let mut client2 = skyscraper_client.clone().unwrap();
        std::thread::spawn(move || {
            for x in 0..12 {
                let val_to_send = format!("0Hello world from WebApp! (#: {})", x);
                client2
                    .try_send(BytesWrapper(Bytes::from(val_to_send)))
                    .unwrap();
            }
        });

Also: The reason why I am using a connection pool is also in response to packets dropping. This was my first attempt to solving the issue. I thought "Maybe if I have concurrent connections, each will have a unique socket bind address thus a unique TCP connection". But, this did not fix the problem. Thus, I went to the closure-locking method. This worked 10/12, which is not satisfactory, but better than 1/12. Thereafter, I applied the lock + the 100ms wait, and that seemed to work. In either case, the order in which the packets are received are not even in order

No wait: 1/12 packets make it to the server
closure lock: 10/12 packets make it to the server
closure lock + 100ms wait: 12/12 packets make it to the server

Also: These tests are all on localhost; the client and server are bound to different ports on localhost, if that makes a difference

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.