In my TCP client I have multiple threads to handle response, for non-blocking I use TcpStream splitted into separate reader and writer. On some condition I need to override reader and writer by calling connect
again, but seems like reader.lock().await
and writer.lock().await
never unlocked, since I locked it previously from another thread (handle_read
and handle_write
methods).
This is my code:
pub struct Reader {
stream: OwnedReadHalf,
// ... rest fields
}
pub struct Writer {
stream: OwnedWriteHalf,
// ... rest fields
}
pub struct Client {
reader: Arc<Mutex<Option<Reader>>>,
writer: Arc<Mutex<Option<Writer>>>,
// ... rest fields
}
impl Client {
pub async fn connect(&mut self, host: &str, port: u16) {
let stream = Self::connect_inner(host, port).await;
let (rx, tx) = stream.into_split();
let mut reader = self.reader.lock().await;
*reader = Some(Reader::new(rx));
let mut writer = self.writer.lock().await;
*writer = Some(Writer::new(tx));
}
async fn connect_inner(host: &str, port: u16) -> TcpStream {
let addr = format!("{}:{}", host, port);
match TcpStream::connect(&addr).await {
Ok(stream) => {
println!("Connected to {}", addr);
stream
},
_ => {
panic!("Cannot connect");
},
}
}
pub async fn handle_connection(&mut self) {
loop {
timeout(Duration::from_millis(TIMEOUT), self.handle_read()).await;
self.handle_queue().await;
timeout(Duration::from_millis(TIMEOUT), self.handle_write()).await;
}
}
async fn handle_queue(&mut self) {
let reader = Arc::clone(&self.reader);
let writer = Arc::clone(&self.writer);
tokio::spawn(async move {
// ... rest code
let stream = Self::connect_inner(&host, port).await;
let (rx, tx) = stream.into_split();
// this await never finished
let mut reader = reader.lock().await;
*reader = Some(Reader::new(rx));
let mut writer = writer.lock().await;
*writer = Some(Writer::new(tx));
// ... rest code
}).await.unwrap()
}
async fn handle_read(&mut self) {
let reader = Arc::clone(&self.reader);
tokio::spawn(async move {
let lock = reader.lock();
match &mut *lock.await {
Some(reader) => {
let raw_data = reader.read().await.unwrap();
// ... rest code
},
_ => {},
}
}).await.unwrap()
}
async fn handle_write(&mut self) {
let writer = Arc::clone(&self.writer);
// ... rest code
tokio::spawn(async move {
match output_queue.lock().await.pop_front() {
Some(packet) => {
if !packet.is_empty() {
let lock = writer.lock();
match &mut *lock.await {
Some(writer) => {
writer.write(&packet).await;
},
_ => {},
}
}
},
_ => {},
}
}).await.unwrap()
}
}
First idea how to fix was to set read/write timeout, but seems like tokio
has no set_timeout method on stream, so I tried to use tokio::time::timeout
instead (inside handle_connection
method), but this not helped. The reader/writer lock() never return result from await:
async fn handle_queue(&mut self) {
let reader = Arc::clone(&self.reader);
let writer = Arc::clone(&self.writer);
tokio::spawn(async move {
// ... rest code
let stream = Self::connect_inner(&host, port).await;
let (rx, tx) = stream.into_split();
// this await never finished
let mut reader = reader.lock().await;
*reader = Some(Reader::new(rx));
let mut writer = writer.lock().await;
*writer = Some(Writer::new(tx));
// ... rest code
}).await.unwrap()
}
Also I tried to drop mutex:
let lock = reader.lock();
drop(lock);
but this also didn't help.
Could somebody explain, why the issue happened and how to fix it ?