"future cannot be sent between threads safely" when use vector of functions inside thread

I implemented TCP client, where I have multiple threads. One of the thread process input from server using special handlers that I defined for this purpose. To share some common data between handlers I use Session struct instance. However, I got an error when I try to use vector of functions inside thread:

error: future cannot be sent between threads safely
   --> src/main.rs:91:9
    |
91  |         tokio::spawn(async move {
    |         ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `Send` is not implemented for `dyn for<'r> Fn(HandlerInput<'r>) -> Result<Vec<HandlerOutput>, std::io::Error>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:99:48
    |
96  |                     let processors: Vec<ProcessorFunction> = vec![];
    |                         ---------- has type `Vec<Box<dyn for<'r> Fn(HandlerInput<'r>) -> Result<Vec<HandlerOutput>, std::io::Error>>>` which is not `Send`
...
99  |                     if let session = &mut *lock.await {
    |                                                ^^^^^^ await occurs here, with `processors` maybe used later
...
126 |                 }
    |                 - `processors` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.19.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

This is my code:

use std::collections::VecDeque;
use std::sync::{Arc};
use std::io::Error;

use tokio::net::{TcpStream};
use tokio::sync::{Mutex};

const TIMEOUT: u64 = 250;

pub struct HeaderCrypt {}
impl HeaderCrypt {
    pub fn encrypt(&mut self, _data: &[u8]) -> Vec<u8> {
        return vec![];
    }
    pub fn decrypt(&mut self, _data: &[u8]) -> Vec<u8> {
        return vec![];
    }
}

pub struct Session {
    pub header_crypt: Option<HeaderCrypt>,
}
impl Session {
    pub fn new() -> Self {
        Self {
            header_crypt: None,
        }
    }
}

pub struct HandlerInput<'a> {
    pub session: &'a mut Session,
    pub data: Option<Vec<u8>>
}

pub enum HandlerOutput {
    Data(Vec<u8>),
    ConnectionRequest(String, i16),
    Void,
}

pub type HandlerResult = Result<HandlerOutput, Error>;

pub type HandlerFunction<'a> = Box<dyn FnMut(&mut HandlerInput) -> HandlerResult + 'a>;

pub type ProcessorResult = Result<Vec<HandlerOutput>, Error>;

pub type ProcessorFunction<'a> = Box<dyn Fn(HandlerInput) -> ProcessorResult + 'a>;

pub struct Client {
    stream: Arc<Mutex<Option<TcpStream>>>,
    input_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
    output_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
    session: Arc<Mutex<Session>>,
}

impl Client {
    pub fn new() -> Self {
        Self {
            stream: Arc::new(Mutex::new(None)),
            input_queue: Arc::new(Mutex::new(VecDeque::new())),
            output_queue: Arc::new(Mutex::new(VecDeque::new())),
            session: Arc::new(Mutex::new(Session::new())),
        }
    }

    pub async fn connect(&mut self, host: &str, port: i16) {
        let addr = format!("{}:{}", host, port);
        match TcpStream::connect(&addr).await {
            Ok(stream) => {
                self.stream = Arc::new(Mutex::new(Some(stream)));
                println!("Connected to {}", addr);
            },
            _ => {
                panic!("Cannot connect");
            },
        }
    }

    pub async fn handle_connection(&mut self) {
        loop {
            self.handle_queue().await;
        }
    }
    
    pub async fn handle_queue(&mut self) {
        let input_queue = Arc::clone(&self.input_queue);
        let output_queue = Arc::clone(&self.output_queue);
        let session = Arc::clone(&self.session);

        tokio::spawn(async move {
            let mut lock = input_queue.lock();

            if let Some(packet) = lock.await.pop_front() {
                if !packet.is_empty() {
                    let processors: Vec<ProcessorFunction> = vec![];

                    let lock = session.lock();
                    if let session = &mut *lock.await {
                        let output_list = processors
                            .iter()
                            .filter_map(|&processor| {
                                let result: ProcessorResult = processor(HandlerInput {
                                    session,
                                    data: Some(packet),
                                });

                                result.ok()
                            })
                            .flatten()
                            .collect::<Vec<HandlerOutput>>();

                        for output in output_list {
                            match output {
                                HandlerOutput::Data(packet) => {
                                    output_queue.lock().await.push_back(packet);
                                },
                                HandlerOutput::ConnectionRequest(host, port) => {
                                    // TODO: fix self here (remove)
                                    // self.connect(&host, port);
                                },
                                HandlerOutput::Void => {},
                            };
                        }
                    }
                }
            }
        }).await.unwrap()
    }
}

#[tokio::main]
async fn main() {
    let mut client = Client::new();
    client.connect("127.0.0.1", 3724).await;
    client.handle_connection().await;
}

This is playground.

Could somebody explain why this happen and how to fix this issue ?

To begin, consider the signature of tokio::spawn():

pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static;

Notice the T: Send bound. This means that the async block can be moved between threads at any await expression. Therefore, all of its variables must be Send at these points. However, a ProcessorFunction is not necessarily Send; it can be any boxed Fn, even one that is not Send. The fix is to add a Send bound ProcessorFunction:

-pub type ProcessorFunction<'a> = Box<dyn Fn(HandlerInput) -> ProcessorResult + 'a>;
+pub type ProcessorFunction<'a> = Box<dyn Fn(HandlerInput) -> ProcessorResult + Send + 'a>;

This code has a couple more issues that prevent it from compiling. First, .filter_map(|&processor| ...) does not compile, since it attempts to move the value out of each reference. Instead, we must write .filter_map(|processor| ...) to operate on the references.

Second, we cannot use the same owned packet for every processor, since Vec<u8> is not Copy. Instead, we must either borrow it or clone it. Here's the former solution (which I'd prefer, since it doesn't copy the data):

 pub struct HandlerInput<'a> {
     pub session: &'a mut Session,
-    pub data: Option<Vec<u8>>
+    pub data: Option<&'a [u8]>,
 }
                                 let result: ProcessorResult = processor(HandlerInput {
                                     session,
-                                    data: Some(packet),
+                                    data: Some(&packet),
                                 });

And the latter solution:

                                 let result: ProcessorResult = processor(HandlerInput {
                                     session,
-                                    data: Some(packet),
+                                    data: Some(packet.clone()),
                                 });

With these changes, the code compiles.

4 Likes

This happens because the argument you pass to tokio::spawn does not satisfy the Send trait bound. The argument is a future you construct via async move, and it is not Send because you are having a local variable processors that does not satisfy the Send trait, but which you are holding across an await point (line 99). processors does not satisfy the Send trait because the corresponding implementation for Vec requires the values it holds satisfy the Send bound. They are of type ProcessorFunction<'a>, which is Box<dyn Fn(HandlerInput) -> ProcessorResult + 'a>, which could be Send if the arguments to Box were.

How to make the arguments to Box satisfy Send? You'll just have to tell the compiler you'll only be sticking Send things in (and it will tell you if you try something else):

pub type ProcessorFunction<'a> = Box<dyn Fn(HandlerInput) -> ProcessorResult + Send + 'a>;

Of course this restricts the functions you can stick into processors, but there isn't a way around that anyways (at least if you don't restructure your code).

That was quite a mouthful, but I hope I could walk you through the thought process here. Note that there will be other errors to fix after that.

2 Likes

thank you very much for detailed explanation !

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.