My TCP client use Reader
and Writer
structs to read/write packets, this is how they defined:
pub struct Reader {
stream: OwnedReadHalf,
// ...
}
pub struct Writer {
stream: OwnedWriteHalf,
// ...
}
by request client sometimes reconnect to another IP addr:
// initial connection
pub async fn connect(&mut self, host: &str, port: u16) -> Result<(), Error> {
// ...
return match Self::connect_inner(host, port).await {
Ok(stream) => {
Self::set_stream_halves(stream, &self._reader, &self._writer).await;
// ...
},
Err(err) => {
// ...
},
}
}
async fn connect_inner(host: &str, port: u16) -> Result<TcpStream, Error> {
let addr = format!("{}:{}", host, port);
match TcpStream::connect(&addr).await {
Ok(stream) => Ok(stream),
Err(err) => Err(err),
}
}
async fn set_stream_halves(
stream: TcpStream,
reader: &Arc<Mutex<Option<Reader>>>,
writer: &Arc<Mutex<Option<Writer>>>
) {
let (rx, tx) = stream.into_split();
let mut reader = reader.lock().await;
*reader = Some(Reader::new(rx));
let mut writer = writer.lock().await;
*writer = Some(Writer::new(tx));
}
// RECONNECT handles below
fn handle_queue(&mut self) -> JoinHandle<()> {
// reader and writer are Arc<Mutex<Option<Reader or Writer>>>
let reader = Arc::clone(&self._reader);
let writer = Arc::clone(&self._writer);
tokio::spawn(async move {
loop {
let packets = input_queue.lock().unwrap().pop_front();
if packets.is_some() {
for packet in packets.unwrap() {
// ...
for mut handler in handler_list {
let response = handler.handle(&mut input).await;
match response {
Ok(output) => {
match output {
HandlerOutput::Data((opcode, header, body)) => {},
HandlerOutput::ConnectionRequest(host, port) => {
match Self::connect_inner(&host, port).await {
Ok(stream) => {
let message = format!(
"Connected to {}:{}", host, port
);
Self::set_stream_halves(
stream, &reader, &writer
).await;
message_income.send_success_message(message);
},
Err(err) => {
message_income.send_error_message(
err.to_string()
);
}
}
},
HandlerOutput::UpdateState(state) => {},
HandlerOutput::Freeze => {},
HandlerOutput::Void => {},
};
},
Err(err) => {},
};
sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
}
}
}
}
})
}
the issue with code is that when I reconnect to another IP:
match Self::connect_inner(&host, port).await {
Ok(stream) => {
Self::set_stream_halves(stream, &reader, &writer).await;
},
Err(err) => {}
}
my handle_read
task still use previous connection:
fn handle_read(&mut self) -> JoinHandle<()> {
// ...
let reader = Arc::clone(&self._reader);
tokio::spawn(async move {
loop {
match &mut *reader.lock().await {
Some(reader) => {
if let Some(packets) = reader.read().await.ok() {
input_queue.lock().unwrap().push_back(packets);
}
},
None => {},
};
sleep(Duration::from_millis(READ_TIMEOUT)).await;
}
})
}
Could somebody advice what can I do in this case ? Probably drop connection in some way, or restart the reader task or smth else ?