`self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement when try to call self method from thread

I implemented TCP client and I need to call connect method again when I receive connect request. For this case I call self.connect inside thread. But got an error:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
   --> src/main.rs:84:31
    |
84  |     pub async fn handle_queue(&mut self) {
    |                               ^^^^^^^^^ this data with an anonymous lifetime `'_`...
...
89  |         tokio::spawn(async move {
    |         ------------ ...is used and required to live as long as `'static` here
    |
note: `'static` lifetime requirement introduced by this bound
   --> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.19.2/src/task/spawn.rs:127:28
    |
127 |         T: Future + Send + 'static,
    |                            ^^^^^^^

I tried to wrap self in Arc::new() inside handle_queue function that handle separate thread inside itself:

pub async fn handle_queue(&mut self) {
        let input_queue = Arc::clone(&self.input_queue);
        let output_queue = Arc::clone(&self.output_queue);
        let session = Arc::clone(&self.session);
        let this = Arc::new(self);

        // ... rest code
        this.connect(host, port);

however, this not helped.

This is my full code:

use std::collections::VecDeque;
use std::sync::{Arc};
use std::io::Error;

use tokio::net::{TcpStream};
use tokio::sync::{Mutex};

pub struct HeaderCrypt {}
impl HeaderCrypt {
    pub fn encrypt(&mut self, _data: &[u8]) -> Vec<u8> {
        return vec![];
    }
    pub fn decrypt(&mut self, _data: &[u8]) -> Vec<u8> {
        return vec![];
    }
}

pub struct Session {
    pub header_crypt: Option<HeaderCrypt>,
}
impl Session {
    pub fn new() -> Self {
        Self {
            header_crypt: None,
        }
    }
}

pub struct HandlerInput<'a> {
    pub session: &'a mut Session,
    pub data: Option<&'a [u8]>
}

pub enum HandlerOutput {
    Data(Vec<u8>),
    ConnectionRequest(String, i16),
    Void,
}

pub type HandlerResult = Result<HandlerOutput, Error>;

pub type HandlerFunction<'a> = Box<dyn FnMut(&mut HandlerInput) -> HandlerResult + 'a>;

pub type ProcessorResult = Result<Vec<HandlerOutput>, Error>;

pub type ProcessorFunction<'a> = Box<dyn Fn(HandlerInput) -> ProcessorResult + Send + 'a>;

pub struct Client {
    stream: Arc<Mutex<Option<TcpStream>>>,
    input_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
    output_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
    session: Arc<Mutex<Session>>,
}

impl Client {
    pub fn new() -> Self {
        Self {
            stream: Arc::new(Mutex::new(None)),
            input_queue: Arc::new(Mutex::new(VecDeque::new())),
            output_queue: Arc::new(Mutex::new(VecDeque::new())),
            session: Arc::new(Mutex::new(Session::new())),
        }
    }

    pub async fn connect(&mut self, host: &str, port: i16) {
        let addr = format!("{}:{}", host, port);
        match TcpStream::connect(&addr).await {
            Ok(stream) => {
                self.stream = Arc::new(Mutex::new(Some(stream)));
                println!("Connected to {}", addr);
            },
            _ => {
                panic!("Cannot connect");
            },
        }
    }

    pub async fn handle_connection(&mut self) {
        loop {
            self.handle_queue().await;
        }
    }
    
    pub async fn handle_queue(&mut self) {
        let input_queue = Arc::clone(&self.input_queue);
        let output_queue = Arc::clone(&self.output_queue);
        let session = Arc::clone(&self.session);

        tokio::spawn(async move {
            let mut lock = input_queue.lock();

            if let Some(packet) = lock.await.pop_front() {
                if !packet.is_empty() {
                    let processors: Vec<ProcessorFunction> = vec![];

                    let lock = session.lock();
                    if let session = &mut *lock.await {
                        let output_list = processors
                            .iter()
                            .filter_map(|processor| {
                                let result: ProcessorResult = processor(HandlerInput {
                                    session,
                                    data: Some(&packet),
                                });

                                result.ok()
                            })
                            .flatten()
                            .collect::<Vec<HandlerOutput>>();

                        for output in output_list {
                            match output {
                                HandlerOutput::Data(packet) => {
                                    output_queue.lock().await.push_back(packet);
                                },
                                HandlerOutput::ConnectionRequest(host, port) => {
                                    // issue is here !
                                    self.connect(&host, port);
                                },
                                HandlerOutput::Void => {},
                            };
                        }
                    }
                }
            }
        }).await.unwrap()
    }
}

This is playground.

Could somebody explain how to fix this issue ?

As you have noticed, the issue is with calling self.connect(). I think the most straightforward way to fix this would be to split Client::connect() into two functions, one taking &mut self and another producing a TcpStream, then call the latter function within Client::handle_queue() (Rust Playground):

@@ -63,11 +63,16 @@
     }
 
     pub async fn connect(&mut self, host: &str, port: i16) {
+        let mut stream = self.stream.lock().await;
+        *stream = Some(Self::connect_inner(host, port).await);
+    }
+
+    async fn connect_inner(host: &str, port: i16) -> TcpStream {
         let addr = format!("{}:{}", host, port);
         match TcpStream::connect(&addr).await {
             Ok(stream) => {
-                self.stream = Arc::new(Mutex::new(Some(stream)));
                 println!("Connected to {}", addr);
+                stream
             },
             _ => {
                 panic!("Cannot connect");
@@ -82,6 +87,7 @@
     }
     
     pub async fn handle_queue(&mut self) {
+        let stream = Arc::clone(&self.stream);
         let input_queue = Arc::clone(&self.input_queue);
         let output_queue = Arc::clone(&self.output_queue);
         let session = Arc::clone(&self.session);
@@ -114,7 +120,8 @@
                                     output_queue.lock().await.push_back(packet);
                                 },
                                 HandlerOutput::ConnectionRequest(host, port) => {
-                                    self.connect(&host, port);
+                                    let mut stream = stream.lock().await;
+                                    *stream = Some(Self::connect_inner(&host, port).await);
                                 },
                                 HandlerOutput::Void => {},
                             };

As a side note, the port should really be a u16, to allow for ports 32768 through 65535. This would also allow you to write TcpStream::connect((host, port)) instead of using format!().

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.