Similar to r2d2, I created a connection pool below called pool
that stores an Addr<ClientSessionVehicle?
that has a handler (impl Message
) for BytesWrapper
that, on actor reception, the bytes are wrote to the underlying websocket stream. I needed to lock-down the calling closure (i.e., try_send
to prevent two writes at once otherwise many of the packets would not even get sent (1/12 packets gets received by the server; especially, the last of the 12 sent gets received, but the first 11 dont get sent). By using the lock() mechanism, 10/12 packets sent consecutively in a for loop (by calling try_send). When I add the sleep() mechanism, all 12/12 packets get sent. What's happening that's forcing me to due funky stuff just to allow consecutive sends of packets? Does this have to do with TCP_NODELAY or something similar? A TCP timer maybe? This is strange, but it does make sense because nagel's algorithm is used to prevent spam on a network, but maybe it's over-doing it here?
#[derive(Clone, Debug)]
pub struct SkyscraperClient {
pool: Arc<Vec<Addr<ClientSessionVehicle>>>,
lock: Arc<Mutex<()>>,
index: usize
}
impl SkyscraperClient {
/// This will panic if a connection cannot be established
pub async fn new(skyscraper_socket_addr: &'static str, skyscraper_url_connect: &'static str, pool_count: usize) -> Self {
let mut pool = Vec::with_capacity(pool_count);
for _ in 0..pool_count {
let client = ClientContainer::connect_to_client(
skyscraper_socket_addr,
skyscraper_url_connect,
None,
).await.unwrap();
pool.push(client);
}
Self {pool: Arc::new(pool), lock: Arc::new(Mutex::new(())), index: 0}
}
/// This will ensure that one send occurs at a time by locking the entire closure.
/// This drops the closure-lock at the end of the function
pub fn try_send(&mut self, packet: BytesWrapper) -> Result<(), ()> {
// place a lock
let lock = self.lock.lock()?;
if self.index == self.pool.len() {
// reset
self.index = 0;
}
self.pool.get(self.index)?.try_send(packet)?;
self.index += 1;
// std::thread::sleep_ms(100);
Ok(())
}
pub fn get_inner(&self) -> SkyscraperClient {
self.clone()
}
}
This is how I'm calling it:
let mut client2 = skyscraper_client.clone().unwrap();
std::thread::spawn(move || {
for x in 0..12 {
let val_to_send = format!("0Hello world from WebApp! (#: {})", x);
client2
.try_send(BytesWrapper(Bytes::from(val_to_send)))
.unwrap();
}
});