Only one tokio::sync::broadcast::Receiver get a message from senders. How to make all receivers get the message?

Hi folks !

I have an issue with broadcast channel and spent already a lot of time, but didn't find a clue. First of all, I am using tokio::sync::broadcast channel. I need this (or similar) type of channel, because I need to have multiple receivers and senders, which will be connected to each other (so every receiver could get message from any sender in the application).

Currently I have 2 receivers: one in handle_output method and one inside one of the tasks from ui.get_tasks().

This is my main method, where I initialize all channels:

pub async fn run(&mut self) -> AnyResult<()> {
        const BUFFER_SIZE: usize = 50;

        let (signal_sender, signal_receiver) = mpsc::channel::<Signal>(1);
        let (input_sender, input_receiver) = mpsc::channel::<Vec<u8>>(BUFFER_SIZE);
        let (output_sender, output_receiver) = mpsc::channel::<PacketOutcome>(BUFFER_SIZE);
        let (query_sender, query_receiver) = broadcast::channel::<HandlerOutput>(BUFFER_SIZE);

        let host = env::var("CURRENT_HOST").expect("CURRENT_HOST must be set");
        let port = env::var("CURRENT_PORT").expect("CURRENT_PORT must be set");

        // inside match block query_sender not works
        match Self::connect_inner(&host, u16::from_str(&port)?).await {
            Ok(stream) => {
                Self::set_stream_halves(
                    stream,
                    Arc::clone(&self._reader),
                    Arc::clone(&self._writer),
                    None,
                    Arc::clone(&self._warden_crypt),
                ).await;

                match self.session.lock().unwrap().set_config(&host) {
                    Ok(_) => {},
                    Err(err) => {
                        query_sender.send(HandlerOutput::ErrorMessage(err.to_string(), None)).unwrap();
                    }
                }

                query_sender.send(
                    HandlerOutput::SuccessMessage(
                        format!("Connected to {}:{}", host, port),
                        None
                    )
                ).unwrap();

                Ok(())
            },
            Err(err) => {
                query_sender.send(HandlerOutput::ErrorMessage(format!("Cannot connect: {}", err), None)).unwrap();

                Err(err)
            },
        }?;

        let mut features: Vec<Box<dyn Feature>> = vec![];
        cfg_if! {
            if #[cfg(feature = "ui")] {
                use crate::features::ui::UI;

                let ui = UI::new(query_sender.clone(), query_sender.subscribe());
                features.push(Box::new(ui));
            }
        }

        let features_tasks: Vec<JoinHandle<()>> =
            features.into_iter().map(|mut feature| feature.get_tasks()).flatten().collect();

        let account = {
            let guard = self.session.lock().unwrap();
            let config = guard.get_config()?;
            config.connection_data.account.to_string()
        };

        {
            query_sender.send(
                HandlerOutput::OutcomeMessage(format!("LOGIN_CHALLENGE as {}", &account), None)
            )?;
        }

        output_sender.send(login_challenge(&account)?).await?;

        let mut all_tasks = vec![
            self.handle_read(input_sender.clone(), signal_receiver, query_sender.clone()),
            self.handle_packet(input_receiver, query_sender.clone()),
            self.handle_write(output_receiver, query_sender.clone()),
            self.handle_output(
                signal_sender.clone(), output_sender.clone(), query_sender.clone(), query_receiver
            ),
        ];

        all_tasks.extend(features_tasks);

        join_all(all_tasks).await;

        Ok(())
    }

In the code above query_sender inside match block sends a message and I expect to get this message here: (ui.get_tasks() - the method I call for every feature in features_tasks from code above).

Currently the ui.get_tasks() is next:

fn get_tasks(&mut self) -> Vec<JoinHandle<()>> {
        let sender = self._sender.clone();
        let mut receiver = sender.subscribe();

        let handle_ui_render = tokio::spawn(async move {
            enable_raw_mode().unwrap();
            execute!(std::io::stdout(), EnterAlternateScreen, EnableMouseCapture).unwrap();

            let mut terminal = Terminal::new(CrosstermBackend::new(std::io::stdout())).unwrap();

            terminal.clear().unwrap();
            terminal.hide_cursor().unwrap();

            let component_options = UIComponentOptions {
                sender: sender.clone(),
            };

            let mut event_flags = UIEventFlags::NONE;
            let state_flags = UIStateFlags::NONE;
            let mut characters_modal = CharactersModal::new(component_options.clone());
            let mut debug_panel = DebugPanel::new(component_options.clone());
            let mut mode_panel = ModePanel::new(component_options.clone());
            let mut realm_modal = RealmModal::new(component_options.clone());
            let mut title = Title::new(component_options.clone());

            loop {
                if poll(Duration::from_millis(100)).unwrap() {
                    if let Ok(event) = crossterm::event::read() {
                        if let Event::Key(key) = event {
                            let crossterm::event::KeyEvent { modifiers, code, .. } = key;

                            event_flags.set(UIEventFlags::IS_EVENT_HANDLED, false);

                            characters_modal.handle_key_event(modifiers, code, &mut event_flags);
                            mode_panel.handle_key_event(modifiers, code, &mut event_flags);
                            realm_modal.handle_key_event(modifiers, code, &mut event_flags);
                            debug_panel.handle_key_event(modifiers, code, &mut event_flags);

                            if code == KeyCode::Char('c') {
                                if modifiers.contains(KeyModifiers::CONTROL) {
                                    // TODO: probably need exit from app in different way
                                    disable_raw_mode().unwrap();
                                    execute!(std::io::stdout(), LeaveAlternateScreen, DisableMouseCapture).unwrap();
                                    exit(0);
                                }
                            }
                        }

                        if let Event::Resize(_, _) = event {
                            terminal.autoresize().unwrap();
                        }
                    }
                }

                if let Ok(output) = receiver.try_recv() {
                    match output {
                        HandlerOutput::SuccessMessage(message, details) => {
                            debug_panel.add_item(LoggerOutput::Success(message, details));
                        },
                        HandlerOutput::ErrorMessage(message, details) => {
                            debug_panel.add_item(LoggerOutput::Error(message, details));
                        },
                        HandlerOutput::DebugMessage(message, details) => {
                            debug_panel.add_item(LoggerOutput::Debug(message, details));
                        },
                        HandlerOutput::IncomeMessage(message, details) => {
                            debug_panel.add_item(LoggerOutput::Income(message, details));
                        },
                        HandlerOutput::OutcomeMessage(message, details) => {
                            debug_panel.add_item(LoggerOutput::Outcome(message, details));
                        },
                        HandlerOutput::TransferCharactersList(characters) => {
                            event_flags.set(UIEventFlags::IS_CHARACTERS_MODAL_OPENED, true);
                            characters_modal.set_items(characters);
                        },
                        HandlerOutput::TransferRealmsList(realms) => {
                            event_flags.set(UIEventFlags::IS_REALM_MODAL_OPENED, true);
                            realm_modal.set_items(realms);
                        },
                        HandlerOutput::SelectRealm(_) => {
                            debug_panel.add_item(LoggerOutput::Debug("message".to_string(), None));
                        },
                        _ => {},
                    }
                }

                let in_debug_mode = { state_flags.contains(UIStateFlags::IN_DEBUG_MODE) };
                {
                    debug_panel.set_debug_mode(in_debug_mode);
                }

                terminal.draw(|frame| {
                    let chunks = Layout::default()
                        .direction(Direction::Vertical)
                        .margin(MARGIN)
                        .constraints([
                            Constraint::Length(3),
                            Constraint::Length(4),
                            Constraint::Percentage(76),
                            Constraint::Percentage(12),
                        ])
                        .split(frame.size());

                    title.render(frame, chunks[0]);
                    mode_panel.render(frame, chunks[1]);
                    debug_panel.render(frame, chunks[2]);

                    if event_flags.contains(UIEventFlags::IS_CHARACTERS_MODAL_OPENED) {
                        characters_modal.render(frame, chunks[2]);
                    }

                    if event_flags.contains(UIEventFlags::IS_REALM_MODAL_OPENED) {
                        realm_modal.render(frame, chunks[2]);
                    }

                }).unwrap();
            }
        });

        vec![
            handle_ui_render,
        ]
    }

receiver.try_recv() from code above gets nothing when I call query_sender from match block. But when I add match-case for this variant in handle_output message, seems like it receives the value:

fn handle_output(
        &mut self,
        signal_sender: Sender<Signal>,
        output_sender: Sender<PacketOutcome>,
        query_sender: BroadcastSender<HandlerOutput>,
        mut query_receiver: BroadcastReceiver<HandlerOutput>
    ) -> JoinHandle<()> {
        let session = Arc::clone(&self.session);
        let reader = Arc::clone(&self._reader);
        let writer = Arc::clone(&self._writer);
        let warden_crypt = Arc::clone(&self._warden_crypt);
        let client_state = Arc::clone(&self._state);

        tokio::spawn(async move {
            loop {
                let result = query_receiver.recv().await;
                match result {
                    Ok(output) => {
                        let session_key = {
                            let guard = session.lock().unwrap();
                            guard.session_key.clone()
                        };

                        match output {
                            HandlerOutput::Data((opcode, packet, json)) => {
                                let packet = match opcode {
                                    Opcode::CMSG_WARDEN_DATA => {
                                        let header = &packet[..OUTCOMING_HEADER_LENGTH];
                                        let body = warden_crypt.lock()
                                            .unwrap().as_mut()
                                            .unwrap().encrypt(&packet[OUTCOMING_HEADER_LENGTH..]);

                                        [header.to_vec(), body.to_vec()].concat()
                                    },
                                    _ => packet,
                                };

                                output_sender.send((opcode, packet, json)).await.unwrap();
                            },
                            HandlerOutput::ConnectionRequest(host, port) => {
                                match Self::connect_inner(&host, port).await {
                                    Ok(stream) => {
                                        signal_sender.send(Signal::Reconnect).await.unwrap();

                                        Self::set_stream_halves(
                                            stream,
                                            Arc::clone(&reader),
                                            Arc::clone(&writer),
                                            session_key.clone(),
                                            Arc::clone(&warden_crypt),
                                        ).await;

                                        query_sender.send(
                                            HandlerOutput::SuccessMessage(
                                                format!("Connected to {}:{}", host, port),
                                                None
                                            )
                                        ).unwrap();

                                        client_state.flags.lock().unwrap().set(
                                            ClientFlags::IS_CONNECTED_TO_REALM,
                                            true,
                                        );
                                    },
                                    Err(err) => {
                                        query_sender.send(
                                            HandlerOutput::ErrorMessage(err.to_string(), None)
                                        ).unwrap();
                                    }
                                }
                            },
                            HandlerOutput::Drop => {
                                break;
                            },
                            HandlerOutput::SelectRealm(realm) => {
                                session.lock().unwrap().selected_realm = Some(realm);
                                {
                                    client_state.flags.lock().unwrap().set(
                                        ClientFlags::IN_FROZEN_MODE,
                                        false
                                    );
                                    client_state.condvar.notify_all();
                                }
                            },
                            _ => {},
                        };
                    },
                    Err(err) => {
                        query_sender.send(HandlerOutput::ErrorMessage(err.to_string(), None)).unwrap();
                    },
                };
            }
        })
    }

And also when I try to send smth from UI, receiver in handle_output receives nothing.

I have a feeling like only one receiver at a time receives a message from any sender.

Could you guys please help me to find a correct way to have any amount of receivers and senders that will be connected with each other ?

I made small example, which probably highlights the problem: Rust Playground

It seems like only first sender works there.

Expected result:
1-value: [1, 0, 0]
...
2-value: [2, 0, 0]
...
3-value: [3, 0, 0]

Actual result:
1-value: [1, 0, 0]
...
2-value: [1, 0, 0]
...
3-value: [1, 0, 0]

What is happening is that task1 starts, prints the initial value, sends a new one and then loops, reading again a new value (the one it just sent) and so on. It ends up sending 5 vec![1, 0, 0] to the channel before hitting the exit condition. Then task2 starts and reads all the vecs that were sent by task1. It does send the vec![2, 0, 0] values in the meantime, but it never reaches the point of receiving them due to the exit condition triggering before that. If you try to print the values the receiver gets after your loop you'll see that the [2, 0, 0] and [3, 0, 0] values were sent, you just didn't print enough values from the receiver. Rust Playground

2 Likes

It makes sense. But my original app still has the issue. I have some thoughts on this. First thought that receiver.recv().await in the handle_output task and receiver.try_recv() in handle_ui_render task probably cause some problem.

I tried to put try_recv() into small sandbox example and when there only 2 tasks all works fine, but when 1 task contains async recv() and rest 2 tasks contains try_recv() the sandbox is hangs and I should kill the process.

This is how to reproduce this: Rust Playground

In my original app the handle_output task after some step just stops to receive anything. However, handle_ui_render receives all messages correctly.

The two tasks using try_recv() never .await anything, instead they busy loop forever, blocking the executor. It's not the best solution, but you coul try putting a yield_now().await somewhere in those loops and see if it works.

1 Like

Finally I found a possible bottleneck. It is std::sync::Condvar that I use in one of my tasks. Somehow it blocks query_receiver in another tasks.

So, this is my method where I call all tasks:

pub async fn run(&mut self) -> AnyResult<()> {
        const BUFFER_SIZE: usize = 50;

        let (signal_sender, signal_receiver) = mpsc::channel::<Signal>(1);
        let (input_sender, input_receiver) = mpsc::channel::<Vec<u8>>(BUFFER_SIZE);
        let (output_sender, output_receiver) = mpsc::channel::<PacketOutcome>(BUFFER_SIZE);
        let (query_sender, query_receiver) = broadcast::channel::<HandlerOutput>(BUFFER_SIZE);

        let host = env::var("CURRENT_HOST").expect("CURRENT_HOST must be set");
        let port = env::var("CURRENT_PORT").expect("CURRENT_PORT must be set");

        match Self::connect_inner(&host, u16::from_str(&port)?).await {
            Ok(stream) => {
                Self::set_stream_halves(
                    stream,
                    Arc::clone(&self._reader),
                    Arc::clone(&self._writer),
                    None,
                    Arc::clone(&self._warden_crypt),
                ).await;

                match self.session.lock().unwrap().set_config(&host) {
                    Ok(_) => {},
                    Err(err) => {
                        query_sender.send(HandlerOutput::ErrorMessage(err.to_string(), None)).unwrap();
                    }
                }

                // query_receiver receives nothing from this send()
                query_sender.send(
                    HandlerOutput::SuccessMessage(
                        format!("Connected to {}:{}", host, port),
                        None
                    )
                ).unwrap();

                Ok(())
            },
            Err(err) => {
                query_sender.send(HandlerOutput::ErrorMessage(format!("Cannot connect: {}", err), None)).unwrap();

                Err(err)
            },
        }?;

        let mut features: Vec<Box<dyn Feature>> = vec![];
        cfg_if! {
            if #[cfg(feature = "ui")] {
                use crate::features::ui::UI;

                let ui = UI::new(query_sender.clone(), query_sender.subscribe());
                features.push(Box::new(ui));
            }
        }

        let features_tasks: Vec<JoinHandle<()>> =
            features.into_iter().map(|mut feature| feature.get_tasks()).flatten().collect();

        let account = {
            let guard = self.session.lock().unwrap();
            let config = guard.get_config()?;
            config.connection_data.account.to_string()
        };

        {
            query_sender.send(
                HandlerOutput::OutcomeMessage(format!("LOGIN_CHALLENGE as {}", &account), None)
            )?;
        }

        output_sender.send(login_challenge(&account)?).await?;

        let mut all_tasks = vec![
            self.handle_read(input_sender.clone(), signal_receiver, query_sender.clone()),
            self.handle_packet(input_receiver, query_sender.clone()),
            self.handle_write(output_receiver, query_sender.clone()),
            self.handle_output(
                signal_sender.clone(), output_sender.clone(), query_sender.clone(), query_receiver
            ),
        ];

        all_tasks.extend(features_tasks);

        join_all(all_tasks).await;

        Ok(())
    }

in code above query_receiver receives nothing from query_sender.send(...) despite Condvar is not used yet.

This is the task where I use Condvar:

fn handle_packet(
        &mut self,
        mut input_receiver: Receiver<Vec<u8>>,
        query_sender: BroadcastSender<HandlerOutput>,
    ) -> JoinHandle<()> {
        let session = Arc::clone(&self.session);
        let client_state = Arc::clone(&self._state);
        let data_storage = Arc::clone(&self.data_storage);

        tokio::spawn(async move {
            loop {
                if let Some(packet) = input_receiver.recv().await {
                    let processors = {
                        let connected_to_realm = {
                            let flags_guard = client_state.flags.lock().unwrap();
                            flags_guard.contains(ClientFlags::IS_CONNECTED_TO_REALM)
                        };

                        match connected_to_realm {
                            true => Self::get_realm_processors(),
                            false => Self::get_login_processors(),
                        }
                    };

                    let mut input = HandlerInput {
                        session: Arc::clone(&session),
                        data: Some(packet),
                        data_storage: Arc::clone(&data_storage),
                        opcode: None,
                    };

                    let handler_list = processors
                        .iter()
                        .flat_map(|processor| processor(&mut input))
                        .collect::<ProcessorResult>();

                    for mut handler in handler_list {
                        let response = handler.handle(&mut input).await;
                        match response {
                            Ok(outputs) => {
                                for output in outputs {
                                    match output {
                                        HandlerOutput::Freeze => {
                                            {
                                                client_state.flags.lock().unwrap().set(
                                                    ClientFlags::IN_FROZEN_MODE,
                                                    true
                                                );
                                            }

                                            let mut flags = client_state.flags.lock().unwrap();

                                            while flags.contains(ClientFlags::IN_FROZEN_MODE) {
                                                flags = client_state.condvar.wait(flags).unwrap();
                                            }
                                        },
                                        _ => {
                                            query_sender.send(output).unwrap();
                                        },
                                    }
                                }
                            },
                            Err(err) => {
                                query_sender.send(HandlerOutput::ErrorMessage(err.to_string(), None)).unwrap();
                            },
                        };
                    }
                }
            }
        })
    }

and this is refactored UI tasks, which will be called together via join_all(all_tasks).await in code above:

fn get_tasks(&mut self) -> Vec<JoinHandle<()>> {
        let mut receiver = self._sender.subscribe();

        enable_raw_mode().unwrap();
        execute!(std::io::stdout(), EnterAlternateScreen, EnableMouseCapture).unwrap();

        let terminal = Arc::new(SyncMutex::new(Terminal::new(CrosstermBackend::new(std::io::stdout())).unwrap()));

        let component_options = UIComponentOptions {
            sender: self._sender.clone(),
        };

        let event_flags = Arc::new(SyncMutex::new(UIEventFlags::NONE));
        let state_flags = Arc::new(SyncMutex::new(UIStateFlags::NONE));
        let characters_modal = Arc::new(SyncMutex::new(CharactersModal::new(component_options.clone())));
        let debug_panel = Arc::new(SyncMutex::new(DebugPanel::new(component_options.clone())));
        let mode_panel = Arc::new(SyncMutex::new(ModePanel::new(component_options.clone())));
        let realm_modal = Arc::new(SyncMutex::new(RealmModal::new(component_options.clone())));
        let title = Arc::new(SyncMutex::new(Title::new(component_options.clone())));

        let handle_events = || {
            let terminal =  Arc::clone(&terminal);
            let event_flags = Arc::clone(&event_flags);
            let characters_modal = Arc::clone(&characters_modal);
            let debug_panel = Arc::clone(&debug_panel);
            let mode_panel = Arc::clone(&mode_panel);
            let realm_modal = Arc::clone(&realm_modal);

            tokio::spawn(async move {
                {
                    terminal.lock().unwrap().clear().unwrap();
                    terminal.lock().unwrap().hide_cursor().unwrap();
                }

                loop {
                    if poll(Duration::from_millis(100)).unwrap() {
                        if let Ok(event) = crossterm::event::read() {
                            if let Event::Key(key) = event {
                                let crossterm::event::KeyEvent { modifiers, code, .. } = key;

                                event_flags.lock().unwrap().set(UIEventFlags::IS_EVENT_HANDLED, false);

                                characters_modal.lock().unwrap().handle_key_event(modifiers, code, Arc::clone(&event_flags));
                                mode_panel.lock().unwrap().handle_key_event(modifiers, code, Arc::clone(&event_flags));
                                realm_modal.lock().unwrap().handle_key_event(modifiers, code, Arc::clone(&event_flags));
                                debug_panel.lock().unwrap().handle_key_event(modifiers, code, Arc::clone(&event_flags));

                                if code == KeyCode::Char('c') {
                                    if modifiers.contains(KeyModifiers::CONTROL) {
                                        // TODO: probably need exit from app in different way
                                        disable_raw_mode().unwrap();
                                        execute!(std::io::stdout(), LeaveAlternateScreen, DisableMouseCapture).unwrap();
                                        exit(0);
                                    }
                                }
                            }

                            if let Event::Resize(_, _) = event {
                                terminal.lock().unwrap().autoresize().unwrap();
                            }
                        }
                    }
                }
            })
        };

        let handle_output = || {
            let event_flags = Arc::clone(&event_flags);
            let characters_modal = Arc::clone(&characters_modal);
            let debug_panel = Arc::clone(&debug_panel);
            let realm_modal = Arc::clone(&realm_modal);

            tokio::spawn(async move {
                loop {
                    match receiver.recv().await {
                        Ok(output) => {
                            match output {
                                HandlerOutput::SuccessMessage(message, details) => {
                                    debug_panel.lock().unwrap().add_item(LoggerOutput::Success(message, details));
                                },
                                HandlerOutput::ErrorMessage(message, details) => {
                                    debug_panel.lock().unwrap().add_item(LoggerOutput::Error(message, details));
                                },
                                HandlerOutput::DebugMessage(message, details) => {
                                    debug_panel.lock().unwrap().add_item(LoggerOutput::Debug(message, details));
                                },
                                HandlerOutput::IncomeMessage(message, details) => {
                                    debug_panel.lock().unwrap().add_item(LoggerOutput::Income(message, details));
                                },
                                HandlerOutput::OutcomeMessage(message, details) => {
                                    debug_panel.lock().unwrap().add_item(LoggerOutput::Outcome(message, details));
                                },
                                HandlerOutput::TransferCharactersList(characters) => {
                                    event_flags.lock().unwrap().set(UIEventFlags::IS_CHARACTERS_MODAL_OPENED, true);
                                    characters_modal.lock().unwrap().set_items(characters);
                                },
                                HandlerOutput::TransferRealmsList(realms) => {
                                    event_flags.lock().unwrap().set(UIEventFlags::IS_REALM_MODAL_OPENED, true);
                                    realm_modal.lock().unwrap().set_items(realms);
                                },
                                HandlerOutput::SelectRealm(_) => {
                                    debug_panel.lock().unwrap().add_item(LoggerOutput::Debug("message".to_string(), None));
                                },
                                _ => {},
                            }
                        },
                        Err(err) => {
                            debug_panel.lock().unwrap().add_item(LoggerOutput::Error(err.to_string(), None));
                        },
                    }
                }
            })
        };

        let handle_render = || {
            let terminal =  Arc::clone(&terminal);
            let event_flags = Arc::clone(&event_flags);
            let characters_modal = Arc::clone(&characters_modal);
            let debug_panel = Arc::clone(&debug_panel);
            let mode_panel = Arc::clone(&mode_panel);
            let realm_modal = Arc::clone(&realm_modal);
            let title = Arc::clone(&title);
            let state_flags = Arc::clone(&state_flags);

            tokio::spawn(async move {
                loop {
                    let in_debug_mode = { state_flags.lock().unwrap().contains(UIStateFlags::IN_DEBUG_MODE) };
                    {
                        debug_panel.lock().unwrap().set_debug_mode(in_debug_mode);
                    }

                    terminal.lock().unwrap().draw(|frame| {
                        let chunks = Layout::default()
                            .direction(Direction::Vertical)
                            .margin(MARGIN)
                            .constraints([
                                Constraint::Length(3),
                                Constraint::Length(4),
                                Constraint::Percentage(76),
                                Constraint::Percentage(12),
                            ])
                            .split(frame.size());

                        title.lock().unwrap().render(frame, chunks[0]);
                        mode_panel.lock().unwrap().render(frame, chunks[1]);
                        debug_panel.lock().unwrap().render(frame, chunks[2]);

                        if event_flags.lock().unwrap().contains(UIEventFlags::IS_CHARACTERS_MODAL_OPENED) {
                            characters_modal.lock().unwrap().render(frame, chunks[2]);
                        }

                        if event_flags.lock().unwrap().contains(UIEventFlags::IS_REALM_MODAL_OPENED) {
                            realm_modal.lock().unwrap().render(frame, chunks[2]);
                        }

                    }).unwrap();
                }
            })
        };

        vec![
            handle_events(),
            handle_output(),
            handle_render(),
        ]
    }

after one of handlers returns HandlerOutput::Freeze, handle_packet method become frozen and after that step query_receiver receives nothing from any part of application. But I would also repeat that in first code example query_sender from match block sends a message which will also not received by query_receiver (but this step processed even before Condvar was used).

So, in another words, probably I need to use Condvar in another way or even use some alternative. Do you have any idea ?

I replaced the tokio::sync::broadcast with async-broadcast and query_sender from match block now sends messages as expected. So the only issue left is "no messages received after Condvar is activated".

TBH I am more than sure that issue is not in tokio::sync::broadcast, but only with Condvar and because of async code the issue appears in different period of time (depends on which crate I use for broadcasting).

Your CondVar usage is blocking the thread, which prevents your other tasks from ever running again. Don't use blocking APIs and instead use async ones to wait for some condition.

1 Like

could tokio::sync::Notify be good alternative for Condvar ?

Yes, it doesn't have the same API but you can get a pretty similar result while being async.

2 Likes

I replaced Condvar with Notify, but task seems still blocked.

This is the task where notify waits for unblock:

fn handle_packet(
        &mut self,
        mut input_receiver: Receiver<Vec<u8>>,
        query_sender: BroadcastSender<HandlerOutput>,
    ) -> JoinHandle<()> {
        let session = Arc::clone(&self.session);
        let client_state = Arc::clone(&self._state);
        let data_storage = Arc::clone(&self.data_storage);

        tokio::spawn(async move {
            loop {
                if let Some(packet) = input_receiver.recv().await {
                    let processors = {
                        let connected_to_realm = {
                            let guard = client_state.lock().await;
                            guard.flags.contains(ClientFlags::IS_CONNECTED_TO_REALM)
                        };

                        match connected_to_realm {
                            true => Self::get_realm_processors(),
                            false => Self::get_login_processors(),
                        }
                    };

                    let mut input = HandlerInput {
                        session: Arc::clone(&session),
                        data: Some(packet),
                        data_storage: Arc::clone(&data_storage),
                        opcode: None,
                    };

                    let handler_list = processors
                        .iter()
                        .flat_map(|processor| processor(&mut input))
                        .collect::<ProcessorResult>();

                    for mut handler in handler_list {
                        let response = handler.handle(&mut input).await;
                        match response {
                            Ok(outputs) => {
                                for output in outputs {
                                    match output {
                                        HandlerOutput::Freeze => {
                                            {
                                                client_state.lock().await.flags.set(
                                                    ClientFlags::IN_FROZEN_MODE,
                                                    true
                                                );
                                            }

                                            client_state.lock().await.notify.notified().await;
                                        },
                                        _ => {
                                            query_sender.broadcast(output).await.unwrap();
                                        },
                                    }
                                }
                            },
                            Err(err) => {
                                query_sender.broadcast(HandlerOutput::ErrorMessage(err.to_string(), None)).await.unwrap();
                            },
                        };
                    }
                }
            }
        })
    }

this is blocked task:

fn handle_output(
        &mut self,
        signal_sender: Sender<Signal>,
        output_sender: Sender<PacketOutcome>,
        query_sender: BroadcastSender<HandlerOutput>,
        mut query_receiver: BroadcastReceiver<HandlerOutput>
    ) -> JoinHandle<()> {
        let session = Arc::clone(&self.session);
        let reader = Arc::clone(&self._reader);
        let writer = Arc::clone(&self._writer);
        let warden_crypt = Arc::clone(&self._warden_crypt);
        let client_state = Arc::clone(&self._state);

        tokio::spawn(async move {
            loop {
                let result = query_receiver.recv().await;
                match result {
                    Ok(output) => {
                        let session_key = {
                            let guard = session.lock().unwrap();
                            guard.session_key.clone()
                        };

                        match output {
                            HandlerOutput::Data((opcode, packet, json)) => {
                                let packet = match opcode {
                                    Opcode::CMSG_WARDEN_DATA => {
                                        let header = &packet[..OUTCOMING_HEADER_LENGTH];
                                        let body = warden_crypt.lock()
                                            .unwrap().as_mut()
                                            .unwrap().encrypt(&packet[OUTCOMING_HEADER_LENGTH..]);

                                        [header.to_vec(), body.to_vec()].concat()
                                    },
                                    _ => packet,
                                };

                                output_sender.send((opcode, packet, json)).await.unwrap();
                            },
                            HandlerOutput::ConnectionRequest(host, port) => {
                                match Self::connect_inner(&host, port).await {
                                    Ok(stream) => {
                                        signal_sender.send(Signal::Reconnect).await.unwrap();

                                        Self::set_stream_halves(
                                            stream,
                                            Arc::clone(&reader),
                                            Arc::clone(&writer),
                                            session_key.clone(),
                                            Arc::clone(&warden_crypt),
                                        ).await;

                                        query_sender.broadcast(
                                            HandlerOutput::SuccessMessage(
                                                format!("Connected to {}:{}", host, port),
                                                None
                                            )
                                        ).await.unwrap();

                                        client_state.lock().await.flags.set(
                                            ClientFlags::IS_CONNECTED_TO_REALM,
                                            true,
                                        );
                                    },
                                    Err(err) => {
                                        query_sender.broadcast(
                                            HandlerOutput::ErrorMessage(err.to_string(), None)
                                        ).await.unwrap();
                                    }
                                }
                            },
                            HandlerOutput::Drop => {
                                break;
                            },
                            HandlerOutput::SelectRealm(realm) => {
                                session.lock().unwrap().selected_realm = Some(realm);
                                {
                                    client_state.lock().await.flags.set(
                                        ClientFlags::IN_FROZEN_MODE,
                                        false
                                    );
                                    client_state.lock().await.notify.notify_waiters();
                                }
                            },
                            _ => {},
                        };
                    },
                    Err(err) => {
                        query_sender.broadcast(HandlerOutput::ErrorMessage(err.to_string(), None)).await.unwrap();
                    },
                };
            }
        })
    }

could Arc<...> variables common for both tasks be the reason ? Just in case, the variables stored in Client itself:

use std::sync::{Arc, Mutex as SyncMutex};
use tokio::sync::{Mutex, Notify};

pub struct Client {
    _reader: Arc<Mutex<Option<Reader>>>,
    _writer: Arc<Mutex<Option<Writer>>>,
    _warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
    _state: Arc<Mutex<ClientState>>,

    session: Arc<SyncMutex<Session>>,
    data_storage: Arc<SyncMutex<DataStorage>>,
}

P.S: just in case - Notify works as expected and unfreezing the app, but query_receiver stays blocked and receives no messages.

I found weird thing. After I added query_sender.broadcast(HandlerOutput::Continue).await.unwrap() into handler_output task which uses query_receiver, another query_receiver woke up:

fn handle_output(
        &mut self,
        signal_sender: Sender<Signal>,
        output_sender: Sender<PacketOutcome>,
        query_sender: BroadcastSender<HandlerOutput>,
        mut query_receiver: BroadcastReceiver<HandlerOutput>,
        notify: Arc<Notify>,
    ) -> JoinHandle<()> {
        let session = Arc::clone(&self.session);
        let reader = Arc::clone(&self._reader);
        let writer = Arc::clone(&self._writer);
        let warden_crypt = Arc::clone(&self._warden_crypt);
        let client_flags = Arc::clone(&self._flags);

        tokio::spawn(async move {
            loop {
                let result = query_receiver.recv().await;
                match result {
                    Ok(output) => {
                        let session_key = {
                            let guard = session.lock().unwrap();
                            guard.session_key.clone()
                        };

                        match output {
                            // ...
                            HandlerOutput::SelectRealm(realm) => {
                                session.lock().unwrap().selected_realm = Some(realm);
                                notify.notify_waiters();
                            },
                            _ => {
                                // this is what I added
                                query_sender.broadcast(HandlerOutput::Continue).await.unwrap();
                            },
                        };
                    },
                    Err(err) => {
                        // ...
                    },
                };
            }
        })
    }

HandlerOutput::Continue is an empty item I added just for testing. Have somebody an idea why this behavior happens ?

Finally I found the blocking task:

let handle_events = || {
            let terminal =  Arc::clone(&terminal);
            let event_flags = Arc::clone(&event_flags);
            let characters_modal = Arc::clone(&characters_modal);
            let debug_panel = Arc::clone(&debug_panel);
            let mode_panel = Arc::clone(&mode_panel);
            let realm_modal = Arc::clone(&realm_modal);

            tokio::spawn(async move {
                {
                    terminal.lock().unwrap().clear().unwrap();
                    terminal.lock().unwrap().hide_cursor().unwrap();
                }

                loop {
                    if poll(Duration::from_millis(100)).unwrap() {
                        if let Ok(event) = crossterm::event::read() {
                            if let Event::Key(key) = event {
                                let crossterm::event::KeyEvent { modifiers, code, .. } = key;

                                event_flags.lock().unwrap().set(UIEventFlags::IS_EVENT_HANDLED, false);

                                characters_modal.lock().unwrap().handle_key_event(modifiers, code, Arc::clone(&event_flags));
                                mode_panel.lock().unwrap().handle_key_event(modifiers, code, Arc::clone(&event_flags));
                                realm_modal.lock().unwrap().handle_key_event(modifiers, code, Arc::clone(&event_flags));
                                debug_panel.lock().unwrap().handle_key_event(modifiers, code, Arc::clone(&event_flags));

                                if code == KeyCode::Char('c') {
                                    if modifiers.contains(KeyModifiers::CONTROL) {
                                        // TODO: probably need exit from app in different way
                                        disable_raw_mode().unwrap();
                                        execute!(std::io::stdout(), LeaveAlternateScreen, DisableMouseCapture).unwrap();
                                        exit(0);
                                    }
                                }
                            }

                            if let Event::Resize(_, _) = event {
                                terminal.lock().unwrap().autoresize().unwrap();
                            }
                        }
                    }
                }
            })
        };

moving this task to spawn_blocking solved the issue.

Well, previous solution was bad. I made this task async using EventStream:

tokio::spawn(async move {
                let mut reader = EventStream::new();

                loop {
                    let mut delay = sleep(Duration::from_millis(100)).fuse();
                    let mut next_event = reader.next().fuse();

                    tokio::select! {
                        _ = delay => {},
                        maybe_event = next_event => {
                            if let Some(Ok(event)) = maybe_event {
                                if let Event::Key(key) = event {
                                    let crossterm::event::KeyEvent { modifiers, code, .. } = key;

                                    event_flags.lock().unwrap().set(UIEventFlags::IS_EVENT_HANDLED, false);

                                    let outputs: Vec<HandlerOutput> = vec![
                                        characters_modal.lock().unwrap().handle_key_event(
                                            modifiers, code, Arc::clone(&event_flags)
                                        ),
                                        mode_panel.lock().unwrap().handle_key_event(
                                            modifiers, code, Arc::clone(&event_flags)
                                        ),
                                        realm_modal.lock().unwrap().handle_key_event(
                                            modifiers, code, Arc::clone(&event_flags)
                                        ),
                                        debug_panel.lock().unwrap().handle_key_event(
                                            modifiers, code, Arc::clone(&event_flags)
                                        )
                                    ].into_iter()
                                        .filter(|item| item.is_some())
                                        .flatten()
                                        .collect();

                                    for output in outputs {
                                        sender.broadcast(output).await.unwrap();
                                    }

                                    if code == KeyCode::Char('c') {
                                        if modifiers.contains(KeyModifiers::CONTROL) {
                                            // TODO: probably need exit from app in different way
                                            disable_raw_mode().unwrap();
                                            execute!(std::io::stdout(), LeaveAlternateScreen, DisableMouseCapture).unwrap();
                                            exit(0);
                                        }
                                    }
                                } else if let Event::Resize(_, _) = event {
                                    terminal.lock().unwrap().autoresize().unwrap();
                                }
                            }
                        }
                    }
                }
            })

now it seems nice.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.