"future cannot be sent between threads safely" when return vector of handlers inside tokio task

I want to refactor part of my code. I want my processors to return Vec of functions instead of Vec of results from calling this functions (as it implemented for now).

So, I have struct that implement trait Processor which return Vec of functions results. Each processor take Vec of functions depend on input and return Vec of results from calling this functions. This is example of processor:

pub fn handler(input: &mut HandlerInput) -> HandlerResult {
    Ok(HandlerOutput::Void)
}

pub struct AuthProcessor;

impl Processor for AuthProcessor {
    fn process_input(input: HandlerInput) -> ProcessorResult {
        let mut reader = Cursor::new(input.data.as_ref().unwrap());
        let opcode = reader.read_u8().unwrap();

        let handlers: Vec<HandlerFunction> = match opcode {
            0 => {
                vec![Box::new(handler)]
            },
            1 => {
                vec![Box::new(handler)]
            },
            16 => {
                vec![
                    Box::new(handler),
                    Box::new(handler)
                ]
            }
            _ => vec![],
        };

        Self::collect_responses(handlers, input)
    }
}

This is how my types looks like:

pub struct HandlerInput<'a> {
    pub session: &'a mut Session,
    pub data: Option<&'a [u8]>,
}

pub enum HandlerOutput {
    Data((u32, Vec<u8>, Vec<u8>)),
    ConnectionRequest(String, u16),
    Void,
}

pub type HandlerResult = Result<HandlerOutput, Error>;

pub type HandlerFunction<'a> = Box<dyn FnMut(&mut HandlerInput) -> HandlerResult + 'a>;

pub type ProcessorResult = Result<Vec<HandlerOutput>, Error>;

pub type ProcessorFunction<'a> = Box<dyn Fn(HandlerInput) -> ProcessorResult + Send + 'a>;

pub trait Processor {
    fn process_input(input: HandlerInput) -> ProcessorResult;

    fn collect_responses(
        handlers: Vec<HandlerFunction>,
        mut input: HandlerInput
    ) -> ProcessorResult {
        let responses = handlers
            .into_iter()
            .filter_map(|mut func| func(&mut input).ok())
            .collect::<Vec<HandlerOutput>>();

        Ok(responses)
    }
}

So, I tried to change my types a bit and tried to change my code, but got an errors like:

error: future cannot be sent between threads safely
   --> src\client\mod.rs:215:9
    |
215 |         tokio::spawn(async move {
    |         ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn for<'r, 's> FnMut(&'r mut HandlerInput<'s>) -> Result<HandlerOutput, std::io::Error>`

In short what I trying to do:

  1. Fixed types
pub type ProcessorResult<'a> = Vec<HandlerFunction<'a>>;

pub type ProcessorFunction<'a> = Box<dyn Fn(&'a mut HandlerInput) -> ProcessorResult<'a> + Send + 'a>;
  1. Make processor to return handlers directly instead of Self::collect_responses(handlers, input)
  2. Refactored code inside task to get handlers list:
let handler_list = processors
    .iter()
    .map(|processor| processor(&mut handler_input))
    .flatten()
    .collect::<Vec<HandlerFunction>>();

for handler in handler_list {
    match handler(&mut handler_input) {
        Ok(output) => {
            match output {
                HandlerOutput::Data((opcode, header, body)) => {},
                HandlerOutput::ConnectionRequest(host, port) => {},
                HandlerOutput::Void => {},
            }
        },
        _ => {},
    };
    sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
}

This is sandbox to reproduce old version.

This is sandbox for what I want to achieve.

Could somebody explain what am I doing wrong ?

I got your second sandbox compiling mostly by removing some unneeded lifetimes and adding a Send annotation. I think the unnecessary lifetime annotations caused some of the more opaque lifetime errors, as they can be confusing to the borrow checker.

These lifetimes are unnecessary and the borrow checker can figure them out itself. Also, HandlerFunction should be annotated as Send.

pub type HandlerFunction<'a> = Box<dyn FnMut(&mut HandlerInput) -> HandlerResult + 'a>;

pub type ProcessorResult<'a> = Vec<HandlerFunction<'a>>;

pub type ProcessorFunction<'a> = Box<dyn Fn(&'a mut HandlerInput) -> ProcessorResult<'a> + Send + 'a>;
pub type HandlerFunction = Box<dyn FnMut(&mut HandlerInput) -> HandlerResult + Send>;

pub type ProcessorResult = Vec<HandlerFunction>;

pub type ProcessorFunction = Box<dyn Fn(& mut HandlerInput) -> ProcessorResult + Send>;

Other than that, there are just a few small things like making Processor::process_input match the signature for processor function and a missing mut marker.

pub trait Processor {
    fn process_input(input: HandlerInput) -> ProcessorResult;
}

pub fn handler(input: &mut HandlerInput) -> HandlerResult {
    Ok(HandlerOutput::Void)
}

pub struct AuthProcessor;

impl Processor for AuthProcessor {
    fn process_input(input: HandlerInput) -> ProcessorResult {
pub trait Processor {
    fn process_input(input: &mut HandlerInput) -> ProcessorResult;
}

pub fn handler(input: &mut HandlerInput) -> HandlerResult {
    Ok(HandlerOutput::Void)
}

pub struct AuthProcessor;

impl Processor for AuthProcessor {
    fn process_input(input: &mut HandlerInput) -> ProcessorResult {

and around line 153:

    for handler in handler_list {
        match handler(&mut handler_input) {
            Ok(output) => {
                match output {

to

    for mut handler in handler_list {
        match handler(&mut handler_input) {
            Ok(output) => {
                match output {

The full compiling example is below.

use std::io::{Cursor};
use byteorder::{ReadBytesExt};
use std::collections::{VecDeque};
use std::io::Error;
use std::sync::{Arc};
use std::time::Duration;
use tokio::sync::{Mutex};
use tokio::net::TcpStream;
use tokio::task::{JoinHandle};
use tokio::time::{sleep};
use futures::future::join_all;

pub struct Session {
    pub session_key: Option<Vec<u8>>,
    pub server_id: Option<u8>,
    // ... rest fields
}

impl Session {
    pub fn new() -> Self {
        Self {
            session_key: None,
            server_id: None,
        }
    }
}

pub struct HandlerInput<'a> {
    pub session: &'a mut Session,
    pub data: Option<&'a [u8]>,
}

pub enum HandlerOutput {
    Data((u32, Vec<u8>, Vec<u8>)),
    ConnectionRequest(String, u16),
    Void,
}

pub type HandlerResult = Result<HandlerOutput, Error>;

pub type HandlerFunction = Box<dyn FnMut(&mut HandlerInput) -> HandlerResult + Send>;

pub type ProcessorResult = Vec<HandlerFunction>;

pub type ProcessorFunction = Box<dyn Fn(& mut HandlerInput) -> ProcessorResult + Send>;

pub trait Processor {
    fn process_input(input: &mut HandlerInput) -> ProcessorResult;
}

pub fn handler(input: &mut HandlerInput) -> HandlerResult {
    Ok(HandlerOutput::Void)
}

pub struct AuthProcessor;

impl Processor for AuthProcessor {
    fn process_input(input: &mut HandlerInput) -> ProcessorResult {
        let mut reader = Cursor::new(input.data.as_ref().unwrap());
        let opcode = reader.read_u8().unwrap();

        let handlers: Vec<HandlerFunction> = match opcode {
            0 => {
                vec![Box::new(handler)]
            },
            1 => {
                vec![Box::new(handler)]
            },
            16 => {
                vec![
                    Box::new(handler),
                    Box::new(handler)
                ]
            }
            _ => vec![],
        };

        handlers
    }
}

const READ_TIMEOUT: u64 = 50;
const WRITE_TIMEOUT: u64 = 1;

pub struct Client {
    input_queue: Arc<Mutex<VecDeque<Vec<Vec<u8>>>>>,
    output_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
    session: Arc<Mutex<Session>>,
}

impl Client {
    pub fn new() -> Self {
        Self {
            input_queue: Arc::new(Mutex::new(VecDeque::new())),
            output_queue: Arc::new(Mutex::new(VecDeque::new())),
            session: Arc::new(Mutex::new(Session::new())),
        }
    }

    pub async fn connect(&mut self, host: &str, port: u16) -> Result<(), Error> {
        let stream = Self::connect_inner(host, port).await.unwrap();
        // ... rest code

        Ok(())
    }

    async fn connect_inner(host: &str, port: u16) -> Result<TcpStream, Error> {
        let addr = format!("{}:{}", host, port);
        match TcpStream::connect(&addr).await {
            Ok(stream) => {
                println!("Connected to {}", addr);
                Ok(stream)
            },
            Err(err) => {
                panic!("Cannot connect: {:?}", err);
            },
        }
    }

    pub async fn handle_connection(&mut self) {
        join_all(vec![
            self.handle_queue().await,
            // ... rest tasks
        ]).await;
    }

    async fn handle_queue(&mut self) -> JoinHandle<()> {
        let input_queue = Arc::clone(&self.input_queue);
        let output_queue = Arc::clone(&self.output_queue);
        let session = Arc::clone(&self.session);

        tokio::spawn(async move {
            loop {
                let mut input_queue = input_queue.lock().await;

                if let Some(packets) = input_queue.pop_front() {
                    for packet in packets {
                        let processors = Self::get_login_processors();

                        let session = &mut *session.lock().await;

                        let mut handler_input = HandlerInput {
                            session,
                            data: Some(&packet),
                        };

                        let handler_list = processors
                            .iter()
                            .map(|processor| processor(&mut handler_input))
                            .flatten()
                            .collect::<Vec<HandlerFunction>>();

                        for mut handler in handler_list {
                            match handler(&mut handler_input) {
                                Ok(output) => {
                                    match output {
                                        HandlerOutput::Data((opcode, header, body)) => {},
                                        HandlerOutput::ConnectionRequest(host, port) => {},
                                        HandlerOutput::Void => {},
                                    }
                                },
                                _ => {},
                            };
                            sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
                        }
                    }
                } else {
                    sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
                }
            }
        })
    }

    fn get_login_processors() -> Vec<ProcessorFunction> {
        return vec![
            Box::new(AuthProcessor::process_input),
        ];
    }
}

Hope this helps :slight_smile:

1 Like

this helps ! thank you very much !

1 Like