I implemented TCP client, where I have multiple threads. One of the thread process input from server using special handlers that I defined for this purpose. To share some common data between handlers I use Session
struct instance. However, I got an error when I try to use vector of functions inside thread:
error: future cannot be sent between threads safely
--> src/main.rs:91:9
|
91 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `Send` is not implemented for `dyn for<'r> Fn(HandlerInput<'r>) -> Result<Vec<HandlerOutput>, std::io::Error>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:99:48
|
96 | let processors: Vec<ProcessorFunction> = vec![];
| ---------- has type `Vec<Box<dyn for<'r> Fn(HandlerInput<'r>) -> Result<Vec<HandlerOutput>, std::io::Error>>>` which is not `Send`
...
99 | if let session = &mut *lock.await {
| ^^^^^^ await occurs here, with `processors` maybe used later
...
126 | }
| - `processors` is later dropped here
note: required by a bound in `tokio::spawn`
--> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.19.2/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
This is my code:
use std::collections::VecDeque;
use std::sync::{Arc};
use std::io::Error;
use tokio::net::{TcpStream};
use tokio::sync::{Mutex};
const TIMEOUT: u64 = 250;
pub struct HeaderCrypt {}
impl HeaderCrypt {
pub fn encrypt(&mut self, _data: &[u8]) -> Vec<u8> {
return vec![];
}
pub fn decrypt(&mut self, _data: &[u8]) -> Vec<u8> {
return vec![];
}
}
pub struct Session {
pub header_crypt: Option<HeaderCrypt>,
}
impl Session {
pub fn new() -> Self {
Self {
header_crypt: None,
}
}
}
pub struct HandlerInput<'a> {
pub session: &'a mut Session,
pub data: Option<Vec<u8>>
}
pub enum HandlerOutput {
Data(Vec<u8>),
ConnectionRequest(String, i16),
Void,
}
pub type HandlerResult = Result<HandlerOutput, Error>;
pub type HandlerFunction<'a> = Box<dyn FnMut(&mut HandlerInput) -> HandlerResult + 'a>;
pub type ProcessorResult = Result<Vec<HandlerOutput>, Error>;
pub type ProcessorFunction<'a> = Box<dyn Fn(HandlerInput) -> ProcessorResult + 'a>;
pub struct Client {
stream: Arc<Mutex<Option<TcpStream>>>,
input_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
output_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
session: Arc<Mutex<Session>>,
}
impl Client {
pub fn new() -> Self {
Self {
stream: Arc::new(Mutex::new(None)),
input_queue: Arc::new(Mutex::new(VecDeque::new())),
output_queue: Arc::new(Mutex::new(VecDeque::new())),
session: Arc::new(Mutex::new(Session::new())),
}
}
pub async fn connect(&mut self, host: &str, port: i16) {
let addr = format!("{}:{}", host, port);
match TcpStream::connect(&addr).await {
Ok(stream) => {
self.stream = Arc::new(Mutex::new(Some(stream)));
println!("Connected to {}", addr);
},
_ => {
panic!("Cannot connect");
},
}
}
pub async fn handle_connection(&mut self) {
loop {
self.handle_queue().await;
}
}
pub async fn handle_queue(&mut self) {
let input_queue = Arc::clone(&self.input_queue);
let output_queue = Arc::clone(&self.output_queue);
let session = Arc::clone(&self.session);
tokio::spawn(async move {
let mut lock = input_queue.lock();
if let Some(packet) = lock.await.pop_front() {
if !packet.is_empty() {
let processors: Vec<ProcessorFunction> = vec![];
let lock = session.lock();
if let session = &mut *lock.await {
let output_list = processors
.iter()
.filter_map(|&processor| {
let result: ProcessorResult = processor(HandlerInput {
session,
data: Some(packet),
});
result.ok()
})
.flatten()
.collect::<Vec<HandlerOutput>>();
for output in output_list {
match output {
HandlerOutput::Data(packet) => {
output_queue.lock().await.push_back(packet);
},
HandlerOutput::ConnectionRequest(host, port) => {
// TODO: fix self here (remove)
// self.connect(&host, port);
},
HandlerOutput::Void => {},
};
}
}
}
}
}).await.unwrap()
}
}
#[tokio::main]
async fn main() {
let mut client = Client::new();
client.connect("127.0.0.1", 3724).await;
client.handle_connection().await;
}
This is playground.
Could somebody explain why this happen and how to fix this issue ?