I have a weird error, I am even not sure what can be the reason of that. In short, read/write methods in my TCP client not work without sleep
at the end of the loop. Client just stay frozen, seems like one of .lock()
not unlocked.
So, this is my code:
const READ_TIMEOUT: u64 = 250;
const WRITE_TIMEOUT: u64 = 1;
pub struct Reader {
stream: OwnedReadHalf,
// ...
}
pub struct Writer {
stream: OwnedWriteHalf,
// ...
}
// initial connection
pub async fn connect(&mut self, host: &str, port: u16) -> Result<(), Error> {
// ...
return match Self::connect_inner(host, port).await {
Ok(stream) => {
Self::set_stream_halves(stream, &self._reader, &self._writer).await;
// ...
},
Err(err) => {
// ...
},
}
}
async fn connect_inner(host: &str, port: u16) -> Result<TcpStream, Error> {
let addr = format!("{}:{}", host, port);
match TcpStream::connect(&addr).await {
Ok(stream) => Ok(stream),
Err(err) => Err(err),
}
}
async fn set_stream_halves(
stream: TcpStream,
reader: &Arc<Mutex<Option<Reader>>>,
writer: &Arc<Mutex<Option<Writer>>>
) {
let (rx, tx) = stream.into_split();
let mut reader = reader.lock().await;
*reader = Some(Reader::new(rx));
let mut writer = writer.lock().await;
*writer = Some(Writer::new(tx));
}
// RECONNECT handles below
fn handle_queue(&mut self) -> JoinHandle<()> {
// reader and writer are Arc<Mutex<Option<Reader or Writer>>>
let reader = Arc::clone(&self._reader);
let writer = Arc::clone(&self._writer);
tokio::spawn(async move {
loop {
let packets = input_queue.lock().unwrap().pop_front();
if packets.is_some() {
for packet in packets.unwrap() {
// ...
for mut handler in handler_list {
let response = handler.handle(&mut input).await;
match response {
Ok(output) => {
match output {
HandlerOutput::Data((opcode, header, body)) => {},
HandlerOutput::ConnectionRequest(host, port) => {
match Self::connect_inner(&host, port).await {
Ok(stream) => {
Self::set_stream_halves(
stream, &reader, &writer
).await;
message_income.send_success_message(message);
},
Err(err) => {
message_income.send_error_message(
err.to_string()
);
}
}
},
HandlerOutput::UpdateState(state) => {},
HandlerOutput::Freeze => {},
HandlerOutput::Void => {},
};
},
Err(err) => {},
};
sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
}
}
}
}
})
}
fn handle_write(&mut self) -> JoinHandle<()> {
// ...
let output_queue = Arc::clone(&self._output_queue);
let writer = Arc::clone(&self._writer);
tokio::spawn(async move {
loop {
match &mut *writer.lock().await {
Some(writer) => {
let packet = output_queue.lock().unwrap().pop_front();
if packet.is_some() {
message_income.send_success_message(format!("HAS PACKET"));
let packet = packet.unwrap();
if !packet.is_empty() {
match writer.write(&packet).await {
Ok(bytes_amount) => {},
Err(err) => {}
};
}
}
},
None => {},
};
sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
}
})
}
fn handle_read(&mut self) -> JoinHandle<()> {
// ...
let input_queue = Arc::clone(&self._input_queue);
let reader = Arc::clone(&self._reader);
let reconnect_signal_receiver = Arc::clone(&self._reconnect_signal_receiver);
tokio::spawn(async move {
loop {
// get signal on reconnect
let receiver = &mut *reconnect_signal_receiver.lock().await;
tokio::select! {
_ = receiver.recv() => {},
Some(packets) = Self::read_packets(&reader) => {
input_queue.lock().unwrap().push_back(packets);
},
};
// if I comment this client just frozen after reconnect
sleep(Duration::from_millis(READ_TIMEOUT)).await;
}
})
}
async fn read_packets(reader: &Arc<Mutex<Option<Reader>>>) -> Option<Vec<Vec<u8>>> {
if let Some(reader) = &mut *reader.lock().await {
if let Some(packets) = reader.read().await.ok() {
if !packets.is_empty() {
return Some(packets);
}
}
}
None
}
My questions:
- Could somebody help me to find what the reason of the issue ?
- Why
sleep
helps to makehandle_read
andhandle_write
working (timeout should be set for both)? - sometimes
handle_write
sends packet in wrong order, what potentially can affect the order in my case ?