Tokio mpsc::Sender blocks by task and all cloned instances of Sender also blocks

My task blocks all instances of mpsc::Sender (that I use in other tasks), so receiver never receive any output from it (never recv). This happens after I set is_casting flag so cast_ai.manage() possible to send first packet. I left some annotations in code below, please see it in comments

So, this is my task:

fn handle_ai(
	&mut self,
	output_sender: Sender<PacketData>,
	message_income: MessageIncome
) -> JoinHandle<()> {
	let session = Arc::clone(&self.session);
	let data_storage = Arc::clone(&self.data_storage);

	let mut cast_ai = CastAI::new();

	tokio::spawn(async move {
		loop {
			let input = AIManagerInput {
				session: Arc::clone(&session),
				data_storage: Arc::clone(&data_storage),
				output_sender: output_sender.clone(),
			};
			cast_ai.manage(input).await;
			
			// when I add sleep(...).await here sender become unlocked
		}
	})
}

this is how CastAI implemented:

pub struct AI {
    state: StateFlags, // bitflags! instance
    counter: u64,
}

impl AI {
    pub fn new() -> Self {
        Self {
            state: StateFlags::NONE,
            counter: 0,
        }
    }

    pub async fn manage(&mut self, mut input: AIManagerInput) {
        let state_flags = {
            let guard = input.session.lock().unwrap();
            guard.state_flags.clone()
        };

        let action_flags = {
            let guard = input.session.lock().unwrap();
            guard.action_flags.clone()
        };

        let is_in_world = state_flags.contains(StateFlags::IN_WORLD);
        let is_casting = action_flags.contains(ActionFlags::IS_CASTING);

        if is_in_world {
            if is_casting {
                if self.state.is_empty() {
                    let spells_map: HashSet<u32> = {
                        let guard = input.session.lock().unwrap();
                        guard.spells_map.clone()
                    };

                    let spell_id: u32 = self.get_spell_id(spells_map);

                    let packet_data = self.cast_spell(spell_id).unwrap();
                    input.output_sender.send(packet_data.clone()).await.unwrap();

					// if I remove this line sender will work (it will send packets non-stop)
                    self.state.set(StateFlags::WAITING_FOR_RESULT, true);
                }
            }
        }
    }
}

This is full code:

pub struct Client {
    _reader: Arc<Mutex<Option<Reader>>>,
    _writer: Arc<Mutex<Option<Writer>>>,
    _warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
    _income_message_pipe: Arc<SyncMutex<IncomeMessagePipe>>,
    _outcome_message_pipe: Arc<SyncMutex<OutcomeMessagePipe>>,

    session: Arc<SyncMutex<Session>>,
    data_storage: Arc<SyncMutex<DataStorage>>,
    client_flags: Arc<SyncMutex<ClientFlags>>,
}

impl Client {
    pub fn new() -> Self {
        Self {
            _reader: Arc::new(Mutex::new(None)),
            _writer: Arc::new(Mutex::new(None)),
            _warden_crypt: Arc::new(SyncMutex::new(None)),
            _income_message_pipe: Arc::new(SyncMutex::new(IncomeMessagePipe::new())),
            _outcome_message_pipe: Arc::new(SyncMutex::new(OutcomeMessagePipe::new())),

            session: Arc::new(SyncMutex::new(Session::new())),
            data_storage: Arc::new(SyncMutex::new(DataStorage::new())),
            client_flags: Arc::new(SyncMutex::new(ClientFlags::NONE)),
        }
    }

    pub async fn connect(&mut self, host: &str, port: u16) -> Result<(), Error> {
        let mut income_pipe = self._income_message_pipe.lock().unwrap();

        return match Self::connect_inner(host, port).await {
            Ok(stream) => {
                Self::set_stream_halves(
                    stream,
                    Arc::clone(&self._reader),
                    Arc::clone(&self._writer),
                    None,
                    Arc::clone(&self._warden_crypt),
                ).await;

                match self.session.lock().unwrap().set_config(host) {
                    Ok(_) => {},
                    Err(err) => {
                        income_pipe.message_income.send_error_message(err.to_string());
                    }
                }

                income_pipe.message_income.send_success_message(
                    format!("Connected to {}:{}", host, port)
                );

                Ok(())
            },
            Err(err) => {
                income_pipe.message_income.send_error_message(
                    format!("Cannot connect: {}", err.to_string())
                );

                Err(err)
            },
        }
    }

    async fn connect_inner(host: &str, port: u16) -> Result<TcpStream, Error> {
        let addr = format!("{}:{}", host, port);
        match TcpStream::connect(&addr).await {
            Ok(stream) => Ok(stream),
            Err(err) => Err(err),
        }
    }

    async fn set_stream_halves(
        stream: TcpStream,
        reader: Arc<Mutex<Option<Reader>>>,
        writer: Arc<Mutex<Option<Writer>>>,
        session_key: Option<Vec<u8>>,
        warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
    ) {
        let (rx, tx) = stream.into_split();

        if session_key.is_none() {
            *reader.lock().await = Some(Reader::new(rx));
            *writer.lock().await = Some(Writer::new(tx));
        } else {
            let session_key = session_key.unwrap();
            *warden_crypt.lock().unwrap() = Some(WardenCrypt::new(&session_key));

            let mut _reader = Reader::new(rx);
            _reader.init(&session_key, Arc::clone(&warden_crypt));
            *reader.lock().await = Some(_reader);

            let mut _writer = Writer::new(tx);
            _writer.init(&session_key);
            *writer.lock().await = Some(_writer);
        }
    }

    pub async fn run(&mut self) {
        const BUFFER_SIZE: usize = 500;

        let (signal_sender, signal_receiver) = mpsc::channel::<Signal>(1);
        let (input_sender, input_receiver) = mpsc::channel::<Vec<u8>>(BUFFER_SIZE);
        let (output_sender, output_receiver) = mpsc::channel::<PacketData>(BUFFER_SIZE);

        let message_income = self._income_message_pipe.lock().unwrap().message_income.clone();
        let dialog_income = self._income_message_pipe.lock().unwrap().dialog_income.clone();

        let username = {
            let guard = self.session.lock().unwrap();
            let config = guard.get_config().unwrap();

            config.connection_data.username.clone()
        };

        {
            let mut guard = self._income_message_pipe.lock().unwrap();
            guard.message_income.send_client_message(
                format!("LOGIN_CHALLENGE as {}", &username)
            );
        }

        output_sender.send(login_challenge(&username)).await.unwrap();

        let (ws_sender, ws_receiver) = WSConnection::create().await;
        let ws_sender = Arc::new(Mutex::new(ws_sender));
        let ws_receiver = Arc::new(Mutex::new(ws_receiver));

        {
            let mut sender = ws_sender.lock().await;
            let mut receiver = ws_receiver.lock().await;

            let mut guard = self.data_storage.lock().unwrap();
            let result = QueryRegistry::players(&mut *sender, &mut *receiver)
                .get_players()
                .get_all().await;

            guard.players_map = match result {
                Ok(players) => players,
                _ => BTreeMap::new(),
            };
        }

        join_all(vec![
            self.handle_ui_render(),
            self.handle_ui_input(),
            self.handle_ui_output(),
            self.handle_ai(output_sender.clone(), message_income.clone()),
            self.handle_read(input_sender.clone(), signal_receiver, message_income.clone()),
            self.handle_write(output_receiver, message_income.clone()),
            self.handle_packet(
                input_receiver,
                signal_sender.clone(),
                output_sender.clone(),
                message_income.clone(),
                dialog_income.clone(),
                Arc::clone(&ws_sender),
                Arc::clone(&ws_receiver),
            ),
        ]).await;
    }

    fn handle_ui_input(&mut self) -> JoinHandle<()> {
        let event_income = self._income_message_pipe.lock().unwrap().event_income.clone();

        tokio::task::spawn_blocking(move || {
            let mut ui_input = UIInput::new(event_income);

            loop {
                ui_input.handle();
            }
        })
    }

    fn handle_ui_render(&mut self) -> JoinHandle<()> {
        let income_message_pipe = Arc::clone(&self._income_message_pipe);
        let dialog_outcome = self._outcome_message_pipe.lock().unwrap().dialog_outcome.clone();
        let flag_outcome = self._outcome_message_pipe.lock().unwrap().flag_outcome.clone();
        let client_flags = Arc::clone(&self.client_flags);

        tokio::task::spawn_blocking(move || {
            let mut ui = UI::new(
                CrosstermBackend::new(std::io::stdout()),
                UIOutputOptions {
                    dialog_outcome,
                    flag_outcome,
                },
            );

            loop {
                let client_flags = {
                    let guard = client_flags.lock().unwrap();
                    guard.clone()
                };

                ui.render(UIRenderOptions {
                    message: income_message_pipe.lock().unwrap().recv(),
                    client_flags: &client_flags,
                });
            }
        })
    }

    fn handle_ui_output(&mut self) -> JoinHandle<()> {
        let outcome_message_pipe = Arc::clone(&self._outcome_message_pipe);
        let session = Arc::clone(&self.session);
        let client_flags = Arc::clone(&self.client_flags);

        tokio::task::spawn_blocking(move || {
            loop {
                if let Ok(message) = outcome_message_pipe.lock().unwrap().recv() {
                    let client_flags = &mut *client_flags.lock().unwrap();

                    let mut ui_output = UIOutput::new(Arc::clone(&session), client_flags);
                    ui_output.handle(message);
                }
            }
        })
    }

    fn handle_ai(
        &mut self,
        output_sender: Sender<PacketData>,
        message_income: MessageIncome
    ) -> JoinHandle<()> {
        let session = Arc::clone(&self.session);
        let data_storage = Arc::clone(&self.data_storage);

        let mut cast_ai = CastAI::new();

        tokio::spawn(async move {
            loop {
                let input = AIManagerInput {
                    session: Arc::clone(&session),
                    data_storage: Arc::clone(&data_storage),
                    output_sender: output_sender.clone(),
                };
                cast_ai.manage(input).await;
            }
        })
    }

    fn handle_packet(
        &mut self,
        mut input_receiver: Receiver<Vec<u8>>,
        signal_sender: Sender<Signal>,
        output_sender: Sender<PacketData>,
        mut message_income: MessageIncome,
        dialog_income: DialogIncome,
        ws_sender: Arc<Mutex<WSSender<Compat<TcpStream>>>>,
        ws_receiver: Arc<Mutex<WSReceiver<Compat<TcpStream>>>>,
    ) -> JoinHandle<()> {
        let session = Arc::clone(&self.session);
        let reader = Arc::clone(&self._reader);
        let writer = Arc::clone(&self._writer);
        let warden_crypt = Arc::clone(&self._warden_crypt);
        let client_flags = Arc::clone(&self.client_flags);
        let data_storage = Arc::clone(&self.data_storage);
        let ws_sender = Arc::clone(&ws_sender);
        let ws_receiver = Arc::clone(&ws_receiver);

        tokio::spawn(async move {
            loop {
                if let Some(packet) = input_receiver.recv().await {
                    let processors = {
                        let guard = client_flags.lock().unwrap();
                        let connected_to_realm = guard.contains(ClientFlags::IS_CONNECTED_TO_REALM);

                        match connected_to_realm {
                            true => Self::get_realm_processors(),
                            false => Self::get_login_processors(),
                        }
                    };

                    let session_key = {
                        let guard = session.lock().unwrap();
                        guard.session_key.clone()
                    };

                    let mut input = HandlerInput {
                        session: Arc::clone(&session),
                        data: Some(&packet),
                        data_storage: Arc::clone(&data_storage),
                        message_income: message_income.clone(),
                        dialog_income: dialog_income.clone(),
                        ws_sender: Arc::clone(&ws_sender),
                        ws_receiver: Arc::clone(&ws_receiver),
                    };

                    let handler_list = processors
                        .iter()
                        .map(|processor| processor(&mut input))
                        .flatten()
                        .collect::<ProcessorResult>();

                    for mut handler in handler_list {
                        let response = handler.handle(&mut input).await;
                        match response {
                            Ok(output) => {
                                match output {
                                    HandlerOutput::Data((opcode, header, body)) => {
                                        let body = match opcode {
                                            Opcode::CMSG_WARDEN_DATA => {
                                                warden_crypt.lock().unwrap().as_mut().unwrap().encrypt(&body)
                                            },
                                            _ => body,
                                        };

                                        let packet = [header, body].concat();
                                        output_sender.send((opcode, packet)).await.unwrap();
                                    },
                                    HandlerOutput::ConnectionRequest(host, port) => {
                                        match Self::connect_inner(&host, port).await {
                                            Ok(stream) => {
                                                signal_sender.send(Signal::Reconnect).await.unwrap();

                                                Self::set_stream_halves(
                                                    stream,
                                                    Arc::clone(&reader),
                                                    Arc::clone(&writer),
                                                    session_key.clone(),
                                                    Arc::clone(&warden_crypt),
                                                ).await;

                                                message_income.send_success_message(
                                                    format!("Connected to {}:{}", host, port)
                                                );

                                                client_flags.lock().unwrap().set(
                                                    ClientFlags::IS_CONNECTED_TO_REALM,
                                                    true,
                                                );
                                            },
                                            Err(err) => {
                                                message_income.send_error_message(
                                                    err.to_string()
                                                );
                                            }
                                        }
                                    },
                                    HandlerOutput::Freeze => {
                                        {
                                            client_flags.lock().unwrap().set(
                                                ClientFlags::IN_FROZEN_MODE,
                                                true
                                            );
                                        }

                                        loop {
                                            let frozen_mode = client_flags
                                                .lock()
                                                .unwrap()
                                                .contains(ClientFlags::IN_FROZEN_MODE);

                                            if !frozen_mode {
                                                break;
                                            }
                                        }
                                    },
                                    HandlerOutput::Void => {},
                                };
                            },
                            Err(err) => {
                                message_income.send_error_message(err.to_string());
                            },
                        };
                    }
                }
            }
        })
    }

    fn handle_write(
        &mut self,
        mut output_receiver: Receiver<PacketData>,
        mut message_income: MessageIncome,
    ) -> JoinHandle<()> {
        let writer = Arc::clone(&self._writer);

        tokio::spawn(async move {
            loop {
                if let Some((opcode, packet)) = output_receiver.recv().await {
                    message_income.send_error_message(
                        format!("{} received: {}", opcode, packet.is_empty())
                    );
                    if !packet.is_empty() {
                        message_income.send_client_message(
                            Opcode::get_opcode_name(opcode)
                        );

                        match Self::write_packet(&writer, packet).await {
                            Ok(bytes_sent) => {
                                message_income.send_debug_message(
                                    format!("{} bytes sent", bytes_sent)
                                );
                            },
                            Err(err) => {
                                message_income.send_client_message(err.to_string());
                            }
                        }
                    }
                }
            }
        })
    }

    fn handle_read(
        &mut self,
        input_sender: Sender<Vec<u8>>,
        mut signal_receiver: Receiver<Signal>,
        mut message_income: MessageIncome,
    ) -> JoinHandle<()> {
        let reader = Arc::clone(&self._reader);

        tokio::spawn(async move {
            loop {
                tokio::select! {
                    signal = signal_receiver.recv() => {},
                    result = Self::read_packet(&reader) => {
                        match result {
                            Ok(packet) => {
                                input_sender.send(packet).await.unwrap();
                            },
                            Err(err) => {
                                message_income.send_error_message(err.to_string());
                                sleep(Duration::from_secs(1)).await;
                            }
                        }
                    },
                };
            }
        })
    }

    async fn read_packet(reader: &Arc<Mutex<Option<Reader>>>) -> Result<Vec<u8>, Error> {
        let mut error = Error::new(ErrorKind::NotFound, "Not connected to TCP");

        if let Some(reader) = &mut *reader.lock().await {
            let result = reader.read().await;
            match result {
                Ok(packet) => {
                    if !packet.is_empty() {
                        return Ok(packet);
                    }
                }
                Err(err) => {
                    error = err;
                },
            }
        }

        Err(error)
    }

    async fn write_packet(
        writer: &Arc<Mutex<Option<Writer>>>,
        packet: Vec<u8>
    ) -> Result<usize, Error> {
        let mut error = Error::new(ErrorKind::NotFound, "Not connected to TCP");

        match &mut *writer.lock().await {
            Some(writer) => {
                match writer.write(&packet).await {
                    Ok(bytes_sent) => {
                        return Ok(bytes_sent);
                    },
                    Err(err) => {
                        error = err;
                    }
                };
            },
            _ => {},
        };

        Err(error)
    }

    fn get_login_processors() -> Vec<ProcessorFunction> {
        return vec![
            Box::new(AuthProcessor::process_input),
        ];
    }

    fn get_realm_processors() -> Vec<ProcessorFunction> {
        return vec![
            Box::new(AuraProcessor::process_input),
            Box::new(CharactersProcessor::process_input),
            Box::new(ChatProcessor::process_input),
            Box::new(GameObjectProcessor::process_input),
            Box::new(InventoryProcessor::process_input),
            Box::new(MovementProcessor::process_input),
            Box::new(PlayerProcessor::process_input),
            Box::new(RealmProcessor::process_input),
            Box::new(SpellProcessor::process_input),
            Box::new(WardenProcessor::process_input),
        ];
    }
}

Could somebody explain why sender blocks and how to fix this issue ? For me it's pretty unpredictable behavior.

This loop will run as fast as sender inside cast_ai.manage can send, which seems like it's probably not what you want.

1 Like

so, using sleep in this case is what I need ? Or you could advice better approach ?

You can have it check periodically (sleep is an okay solution to that) or have it only run when the data that it uses changes.

It looks like you're working on some sort of game. Games sometimes use "ticks" to update state. For example you might have 30 ticks a second, and you only recalculate your games state 30 times a second instead of trying to do it constantly.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.