I implemented TCP client and I need to call connect
method again when I receive connect request. For this case I call self.connect
inside thread. But got an error:
error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:84:31
|
84 | pub async fn handle_queue(&mut self) {
| ^^^^^^^^^ this data with an anonymous lifetime `'_`...
...
89 | tokio::spawn(async move {
| ------------ ...is used and required to live as long as `'static` here
|
note: `'static` lifetime requirement introduced by this bound
--> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.19.2/src/task/spawn.rs:127:28
|
127 | T: Future + Send + 'static,
| ^^^^^^^
I tried to wrap self in Arc::new() inside handle_queue
function that handle separate thread inside itself:
pub async fn handle_queue(&mut self) {
let input_queue = Arc::clone(&self.input_queue);
let output_queue = Arc::clone(&self.output_queue);
let session = Arc::clone(&self.session);
let this = Arc::new(self);
// ... rest code
this.connect(host, port);
however, this not helped.
This is my full code:
use std::collections::VecDeque;
use std::sync::{Arc};
use std::io::Error;
use tokio::net::{TcpStream};
use tokio::sync::{Mutex};
pub struct HeaderCrypt {}
impl HeaderCrypt {
pub fn encrypt(&mut self, _data: &[u8]) -> Vec<u8> {
return vec![];
}
pub fn decrypt(&mut self, _data: &[u8]) -> Vec<u8> {
return vec![];
}
}
pub struct Session {
pub header_crypt: Option<HeaderCrypt>,
}
impl Session {
pub fn new() -> Self {
Self {
header_crypt: None,
}
}
}
pub struct HandlerInput<'a> {
pub session: &'a mut Session,
pub data: Option<&'a [u8]>
}
pub enum HandlerOutput {
Data(Vec<u8>),
ConnectionRequest(String, i16),
Void,
}
pub type HandlerResult = Result<HandlerOutput, Error>;
pub type HandlerFunction<'a> = Box<dyn FnMut(&mut HandlerInput) -> HandlerResult + 'a>;
pub type ProcessorResult = Result<Vec<HandlerOutput>, Error>;
pub type ProcessorFunction<'a> = Box<dyn Fn(HandlerInput) -> ProcessorResult + Send + 'a>;
pub struct Client {
stream: Arc<Mutex<Option<TcpStream>>>,
input_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
output_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
session: Arc<Mutex<Session>>,
}
impl Client {
pub fn new() -> Self {
Self {
stream: Arc::new(Mutex::new(None)),
input_queue: Arc::new(Mutex::new(VecDeque::new())),
output_queue: Arc::new(Mutex::new(VecDeque::new())),
session: Arc::new(Mutex::new(Session::new())),
}
}
pub async fn connect(&mut self, host: &str, port: i16) {
let addr = format!("{}:{}", host, port);
match TcpStream::connect(&addr).await {
Ok(stream) => {
self.stream = Arc::new(Mutex::new(Some(stream)));
println!("Connected to {}", addr);
},
_ => {
panic!("Cannot connect");
},
}
}
pub async fn handle_connection(&mut self) {
loop {
self.handle_queue().await;
}
}
pub async fn handle_queue(&mut self) {
let input_queue = Arc::clone(&self.input_queue);
let output_queue = Arc::clone(&self.output_queue);
let session = Arc::clone(&self.session);
tokio::spawn(async move {
let mut lock = input_queue.lock();
if let Some(packet) = lock.await.pop_front() {
if !packet.is_empty() {
let processors: Vec<ProcessorFunction> = vec![];
let lock = session.lock();
if let session = &mut *lock.await {
let output_list = processors
.iter()
.filter_map(|processor| {
let result: ProcessorResult = processor(HandlerInput {
session,
data: Some(&packet),
});
result.ok()
})
.flatten()
.collect::<Vec<HandlerOutput>>();
for output in output_list {
match output {
HandlerOutput::Data(packet) => {
output_queue.lock().await.push_back(packet);
},
HandlerOutput::ConnectionRequest(host, port) => {
// issue is here !
self.connect(&host, port);
},
HandlerOutput::Void => {},
};
}
}
}
}
}).await.unwrap()
}
}
This is playground.
Could somebody explain how to fix this issue ?