I trying to sync my reader/writer to make them wait for some changes from another thread. For this case I use tokio::mpsc
(I also tried tokio::broadcast
which from the first sight better fits this case but faced with Lagged issue and decided to try separate mpsc for each thread). But when I send two signals one-after-one I got only first signal received (seems like smth block second signal). This is my code:
pub struct Client {
// ...
_signal_sender1: mpsc::Sender<Signal>,
_signal_receiver1: Arc<Mutex<mpsc::Receiver<Signal>>>,
_signal_sender2: mpsc::Sender<Signal>,
_signal_receiver2: Arc<Mutex<mpsc::Receiver<Signal>>>,
}
fn handle_queue(&mut self) -> JoinHandle<()> {
let input_queue_receiver = Arc::clone(&self._input_queue_receiver);
let output_queue_sender = self._output_queue_sender.clone();
let session = Arc::clone(&self.session);
let reader = Arc::clone(&self._reader);
let writer = Arc::clone(&self._writer);
let warden_crypt = Arc::clone(&self._warden_crypt);
let client_flags = Arc::clone(&self.client_flags);
let data_storage = Arc::clone(&self.data_storage);
let signal_sender1 = self._signal_sender1.clone();
let signal_sender2 = self._signal_sender2.clone();
let mut message_income = self._income_message_pipe.lock().unwrap().message_income.clone();
let dialog_income = self._income_message_pipe.lock().unwrap().dialog_income.clone();
tokio::spawn(async move {
loop {
let packets = input_queue_receiver.lock().await.recv().await;
if packets.is_some() {
let processors = {
let guard = client_flags.lock().unwrap();
let connected_to_realm = guard.contains(ClientFlags::IS_CONNECTED_TO_REALM);
match connected_to_realm {
true => Self::get_realm_processors(),
false => Self::get_login_processors(),
}
};
for packet in packets.unwrap() {
let mut input = HandlerInput {
session: Arc::clone(&session),
data: Some(&packet),
data_storage: Arc::clone(&data_storage),
message_income: message_income.clone(),
dialog_income: dialog_income.clone(),
};
let handler_list = processors
.iter()
.map(|processor| processor(&mut input))
.flatten()
.collect::<ProcessorResult>();
for mut handler in handler_list {
let response = handler.handle(&mut input).await;
match response {
Ok(output) => {
match output {
HandlerOutput::Data((opcode, header, body)) => {},
HandlerOutput::ConnectionRequest(host, port) => {
match Self::connect_inner(&host, port).await {
Ok(stream) => {
// I should send signals before Self::set_stream_halves because of reader that blocks another thread otherwise
signal_sender1.send(Signal::Reconnect).await.unwrap();
signal_sender2.send(Signal::Reconnect).await.unwrap();
Self::set_stream_halves(
stream,
Arc::clone(&reader),
Arc::clone(&writer),
).await;
message_income.send_success_message(
format!("Connected to {}:{}", host, port)
);
},
Err(err) => {
message_income.send_error_message(
err.to_string()
);
}
}
},
HandlerOutput::UpdateState(state) => {
match state {
State::SetEncryption(session_key) => {
*warden_crypt.lock().unwrap() =
Some(WardenCrypt::new(&session_key));
if let Some(reader) = &mut *reader.lock().await
{
reader.init(
&session_key,
Arc::clone(&warden_crypt)
);
signal_sender1.send(Signal::NeedSync).await.unwrap();
}
if let Some(writer) = &mut *writer.lock().await
{
writer.init(&session_key);
signal_sender2.send(Signal::NeedSync).await.unwrap();
}
},
State::SetConnectedToRealm(is_authorized) => {},
}
},
HandlerOutput::Freeze => {},
HandlerOutput::Void => {},
};
},
Err(err) => {},
};
}
}
}
}
})
}
fn handle_write(&mut self) -> JoinHandle<()> {
let output_queue_receiver = Arc::clone(&self._output_queue_receiver);
let writer = Arc::clone(&self._writer);
let signal_receiver = Arc::clone(&self._signal_receiver2);
let mut message_income = self._income_message_pipe.lock().unwrap().message_income.clone();
tokio::spawn(async move {
let mut need_sync = false;
loop {
let packet = output_queue_receiver.lock().await.recv().await;
if packet.is_some() {
let packet = packet.unwrap();
if !packet.is_empty() {
let receiver = &mut *signal_receiver.lock().await;
tokio::select! {
Some(signal) = receiver.recv() => {
match signal {
Signal::Reconnect => {
// ...
},
Signal::NeedSync => {
need_sync = true;
},
};
},
result = Self::write_packet(&writer, packet), if !need_sync => {
match result {
Ok(bytes_sent) => {
message_income.send_debug_message(
format!("{} bytes sent", bytes_sent)
);
},
Err(err) => {
message_income.send_client_message(err.to_string());
}
}
},
result = Self::check_encryptor(&writer), if need_sync => {
if result {
need_sync = false;
}
},
}
}
}
}
})
}
fn handle_read(&mut self) -> JoinHandle<()> {
let input_queue_sender = self._input_queue_sender.clone();
let reader = Arc::clone(&self._reader);
let signal_receiver = Arc::clone(&self._signal_receiver1);
let mut message_income = self._income_message_pipe.lock().unwrap().message_income.clone();
tokio::spawn(async move {
let mut need_sync = false;
loop {
let receiver = &mut *signal_receiver.lock().await;
tokio::select! {
Some(signal) = receiver.recv() => {
match signal {
Signal::Reconnect => {
// ...
},
Signal::NeedSync => {
need_sync = true;
},
};
},
Some(packets) = Self::read_packets(&reader), if !need_sync => {
input_queue_sender.send(packets).await.unwrap();
},
result = Self::check_decryptor(&reader), if need_sync => {
if result {
need_sync = false;
}
},
};
}
})
}
async fn read_packets(reader: &Arc<Mutex<Option<Reader>>>) -> Option<Vec<Vec<u8>>> {
if let Some(reader) = &mut *reader.lock().await {
if let Some(packets) = reader.read().await.ok() {
if !packets.is_empty() {
return Some(packets);
}
}
}
None
}
async fn write_packet(writer: &Arc<Mutex<Option<Writer>>>, packet: Vec<u8>) -> Result<usize, Error> {
let mut error = Error::new(ErrorKind::NotFound, "Not connected to TCP");
match &mut *writer.lock().await {
Some(writer) => {
match writer.write(&packet).await {
Ok(bytes_sent) => {
return Ok(bytes_sent);
},
Err(err) => {
error = err;
}
};
},
_ => {},
};
Err(error)
}
async fn check_decryptor(reader: &Arc<Mutex<Option<Reader>>>) -> bool {
if let Some(reader) = &mut *reader.lock().await {
return reader.has_decryptor();
}
false
}
async fn check_encryptor(reader: &Arc<Mutex<Option<Writer>>>) -> bool {
if let Some(writer) = &mut *reader.lock().await {
return writer.has_encryptor();
}
false
}
Sometimes I see that handle_write
task receive signals, sometimes one signal received by handle_read
and one by handle_write
, but in most cases handle_read
receive both and handle_write
receive nothing.
Could somebody explain me this weird behavior ?
UPD: please check handle_queue
for signal_sender1
and signal_sender2
, I have two cases there where I use this senders.