Hi Community ! I implemented TCP client using tokio
and I got an issue with lifetimes when tried to use ReadHalf
and WriteHalf
in separate methods:
error[E0726]: implicit elided lifetime not allowed here
--> src/main.rs:105:61
|
105 | async fn handle_write(&mut self, transmitter: Arc<Mutex<WriteHalf>>) {
| ^^^^^^^^^- help: indicate the anonymous lifetime: `<'_>`
|
= note: assuming a `'static` lifetime...
and this one:
error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:69:36
|
69 | pub async fn handle_connection(&mut self) {
| ^^^^^^^^^ this data with an anonymous lifetime `'_`...
...
80 | self.handle_read(Arc::clone(&receiver))
| --------- ...is used and required to live as long as `'static` here
My code is next:
use std::collections::VecDeque;
use std::time::Duration;
use std::sync::{Arc};
use tokio::net::{
TcpStream,
tcp::{
ReadHalf,
WriteHalf
}
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::time::timeout;
use tokio::sync::{Mutex};
const TIMEOUT: u64 = 250;
pub struct HeaderCrypt {}
impl HeaderCrypt {
pub fn encrypt(&mut self, _data: &[u8]) -> Vec<u8> {
return vec![];
}
pub fn decrypt(&mut self, _data: &[u8]) -> Vec<u8> {
return vec![];
}
}
pub struct Session {
pub header_crypt: Option<HeaderCrypt>,
}
impl Session {
pub fn new() -> Self {
Self {
header_crypt: None,
}
}
}
pub struct Client {
stream: Option<TcpStream>,
input_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
output_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
session: Arc<Mutex<Session>>,
}
impl Client {
pub fn new() -> Self {
Self {
stream: None,
input_queue: Arc::new(Mutex::new(VecDeque::new())),
output_queue: Arc::new(Mutex::new(VecDeque::new())),
session: Arc::new(Mutex::new(Session::new())),
}
}
pub async fn connect(&mut self, host: &str, port: i16) {
let addr = format!("{}:{}", host, port);
match TcpStream::connect(&addr).await {
Ok(stream) => {
self.stream = Some(stream);
println!("Connected to {}", addr);
},
_ => {
panic!("Cannot connect");
},
}
}
pub async fn handle_connection(&mut self) {
let packet = vec![1, 2, 3, 4, 5];
self.output_queue.lock().await.push_back(packet);
let (rx, tx) = self.stream.as_mut().unwrap().split();
let receiver = Arc::new(Mutex::new(rx));
let transmitter = Arc::new(Mutex::new(tx));
loop {
timeout(
Duration::from_millis(TIMEOUT),
self.handle_read(Arc::clone(&receiver))
).await;
self.handle_queue().await;
timeout(
Duration::from_millis(TIMEOUT),
self.handle_write(Arc::clone(&transmitter))
).await;
}
}
async fn handle_queue(&mut self) {
let input_queue = Arc::clone(&self.input_queue);
tokio::spawn(async move {
match input_queue.lock().await.pop_front() {
Some(packet) if !packet.is_empty() => {
println!("PACKET: {:?}", packet);
},
_ => {},
}
}).await.unwrap()
}
async fn handle_write(&mut self, transmitter: Arc<Mutex<WriteHalf>>) {
let session = Arc::clone(&self.session);
let output_queue = Arc::clone(&self.output_queue);
tokio::spawn(async move {
let packet = match output_queue.lock().await.pop_front() {
Some(packet) => match session.lock().await.header_crypt.as_mut() {
Some(header_crypt) => header_crypt.encrypt(&packet),
_ => packet,
},
_ => vec![],
};
if !packet.is_empty() {
println!("PACKET: {:?}", &packet);
let mut lock = transmitter.lock().await;
lock.write(&packet).await.unwrap();
lock.flush().await.unwrap();
}
}).await.unwrap()
}
async fn handle_read(&mut self, receiver: Arc<Mutex<ReadHalf>>) {
let queue = Arc::clone(&self.input_queue);
let session = Arc::clone(&self.session);
tokio::spawn(async move {
let mut buffer = [0u8; 4096];
let mut lock = receiver.lock().await;
match lock.read(&mut buffer).await {
Ok(bytes_count) => {
println!("{}", &bytes_count);
let raw_data = match session.lock().await.header_crypt.as_mut() {
Some(header_crypt) => header_crypt.decrypt(&buffer[..bytes_count]),
_ => buffer[..bytes_count].to_vec(),
};
queue.lock().await.push_back(raw_data);
},
_ => {},
};
}).await.unwrap()
}
}
what I am trying to achieve: I need make my app to be possible to read packets from server in separate thread and put them into input_queue
in same order they come from server (to be possible to decrypt them). Once some packet added to the input_queue
, I need to process it in separate thread (without blocking I/O) and push result into output_queue
. Once packet appear in output_queue
, I need to write it back to the server in separate thread.
Maybe I do not need so much threads, I just need to make sure that read and write are not blocked by another parts of code.
The playground with implementation.
Could somebody help me to fix ?