Hi folks !
I have an issue with broadcast channel and spent already a lot of time, but didn't find a clue. First of all, I am using tokio::sync::broadcast channel. I need this (or similar) type of channel, because I need to have multiple receivers and senders, which will be connected to each other (so every receiver could get message from any sender in the application).
Currently I have 2 receivers: one in handle_output method and one inside one of the tasks from ui.get_tasks().
This is my main method, where I initialize all channels:
pub async fn run(&mut self) -> AnyResult<()> {
const BUFFER_SIZE: usize = 50;
let (signal_sender, signal_receiver) = mpsc::channel::<Signal>(1);
let (input_sender, input_receiver) = mpsc::channel::<Vec<u8>>(BUFFER_SIZE);
let (output_sender, output_receiver) = mpsc::channel::<PacketOutcome>(BUFFER_SIZE);
let (query_sender, query_receiver) = broadcast::channel::<HandlerOutput>(BUFFER_SIZE);
let host = env::var("CURRENT_HOST").expect("CURRENT_HOST must be set");
let port = env::var("CURRENT_PORT").expect("CURRENT_PORT must be set");
// inside match block query_sender not works
match Self::connect_inner(&host, u16::from_str(&port)?).await {
Ok(stream) => {
Self::set_stream_halves(
stream,
Arc::clone(&self._reader),
Arc::clone(&self._writer),
None,
Arc::clone(&self._warden_crypt),
).await;
match self.session.lock().unwrap().set_config(&host) {
Ok(_) => {},
Err(err) => {
query_sender.send(HandlerOutput::ErrorMessage(err.to_string(), None)).unwrap();
}
}
query_sender.send(
HandlerOutput::SuccessMessage(
format!("Connected to {}:{}", host, port),
None
)
).unwrap();
Ok(())
},
Err(err) => {
query_sender.send(HandlerOutput::ErrorMessage(format!("Cannot connect: {}", err), None)).unwrap();
Err(err)
},
}?;
let mut features: Vec<Box<dyn Feature>> = vec![];
cfg_if! {
if #[cfg(feature = "ui")] {
use crate::features::ui::UI;
let ui = UI::new(query_sender.clone(), query_sender.subscribe());
features.push(Box::new(ui));
}
}
let features_tasks: Vec<JoinHandle<()>> =
features.into_iter().map(|mut feature| feature.get_tasks()).flatten().collect();
let account = {
let guard = self.session.lock().unwrap();
let config = guard.get_config()?;
config.connection_data.account.to_string()
};
{
query_sender.send(
HandlerOutput::OutcomeMessage(format!("LOGIN_CHALLENGE as {}", &account), None)
)?;
}
output_sender.send(login_challenge(&account)?).await?;
let mut all_tasks = vec![
self.handle_read(input_sender.clone(), signal_receiver, query_sender.clone()),
self.handle_packet(input_receiver, query_sender.clone()),
self.handle_write(output_receiver, query_sender.clone()),
self.handle_output(
signal_sender.clone(), output_sender.clone(), query_sender.clone(), query_receiver
),
];
all_tasks.extend(features_tasks);
join_all(all_tasks).await;
Ok(())
}
In the code above query_sender inside match block sends a message and I expect to get this message here: (ui.get_tasks() - the method I call for every feature in features_tasks from code above).
Currently the ui.get_tasks() is next:
fn get_tasks(&mut self) -> Vec<JoinHandle<()>> {
let sender = self._sender.clone();
let mut receiver = sender.subscribe();
let handle_ui_render = tokio::spawn(async move {
enable_raw_mode().unwrap();
execute!(std::io::stdout(), EnterAlternateScreen, EnableMouseCapture).unwrap();
let mut terminal = Terminal::new(CrosstermBackend::new(std::io::stdout())).unwrap();
terminal.clear().unwrap();
terminal.hide_cursor().unwrap();
let component_options = UIComponentOptions {
sender: sender.clone(),
};
let mut event_flags = UIEventFlags::NONE;
let state_flags = UIStateFlags::NONE;
let mut characters_modal = CharactersModal::new(component_options.clone());
let mut debug_panel = DebugPanel::new(component_options.clone());
let mut mode_panel = ModePanel::new(component_options.clone());
let mut realm_modal = RealmModal::new(component_options.clone());
let mut title = Title::new(component_options.clone());
loop {
if poll(Duration::from_millis(100)).unwrap() {
if let Ok(event) = crossterm::event::read() {
if let Event::Key(key) = event {
let crossterm::event::KeyEvent { modifiers, code, .. } = key;
event_flags.set(UIEventFlags::IS_EVENT_HANDLED, false);
characters_modal.handle_key_event(modifiers, code, &mut event_flags);
mode_panel.handle_key_event(modifiers, code, &mut event_flags);
realm_modal.handle_key_event(modifiers, code, &mut event_flags);
debug_panel.handle_key_event(modifiers, code, &mut event_flags);
if code == KeyCode::Char('c') {
if modifiers.contains(KeyModifiers::CONTROL) {
// TODO: probably need exit from app in different way
disable_raw_mode().unwrap();
execute!(std::io::stdout(), LeaveAlternateScreen, DisableMouseCapture).unwrap();
exit(0);
}
}
}
if let Event::Resize(_, _) = event {
terminal.autoresize().unwrap();
}
}
}
if let Ok(output) = receiver.try_recv() {
match output {
HandlerOutput::SuccessMessage(message, details) => {
debug_panel.add_item(LoggerOutput::Success(message, details));
},
HandlerOutput::ErrorMessage(message, details) => {
debug_panel.add_item(LoggerOutput::Error(message, details));
},
HandlerOutput::DebugMessage(message, details) => {
debug_panel.add_item(LoggerOutput::Debug(message, details));
},
HandlerOutput::IncomeMessage(message, details) => {
debug_panel.add_item(LoggerOutput::Income(message, details));
},
HandlerOutput::OutcomeMessage(message, details) => {
debug_panel.add_item(LoggerOutput::Outcome(message, details));
},
HandlerOutput::TransferCharactersList(characters) => {
event_flags.set(UIEventFlags::IS_CHARACTERS_MODAL_OPENED, true);
characters_modal.set_items(characters);
},
HandlerOutput::TransferRealmsList(realms) => {
event_flags.set(UIEventFlags::IS_REALM_MODAL_OPENED, true);
realm_modal.set_items(realms);
},
HandlerOutput::SelectRealm(_) => {
debug_panel.add_item(LoggerOutput::Debug("message".to_string(), None));
},
_ => {},
}
}
let in_debug_mode = { state_flags.contains(UIStateFlags::IN_DEBUG_MODE) };
{
debug_panel.set_debug_mode(in_debug_mode);
}
terminal.draw(|frame| {
let chunks = Layout::default()
.direction(Direction::Vertical)
.margin(MARGIN)
.constraints([
Constraint::Length(3),
Constraint::Length(4),
Constraint::Percentage(76),
Constraint::Percentage(12),
])
.split(frame.size());
title.render(frame, chunks[0]);
mode_panel.render(frame, chunks[1]);
debug_panel.render(frame, chunks[2]);
if event_flags.contains(UIEventFlags::IS_CHARACTERS_MODAL_OPENED) {
characters_modal.render(frame, chunks[2]);
}
if event_flags.contains(UIEventFlags::IS_REALM_MODAL_OPENED) {
realm_modal.render(frame, chunks[2]);
}
}).unwrap();
}
});
vec![
handle_ui_render,
]
}
receiver.try_recv() from code above gets nothing when I call query_sender from match block. But when I add match-case for this variant in handle_output message, seems like it receives the value:
fn handle_output(
&mut self,
signal_sender: Sender<Signal>,
output_sender: Sender<PacketOutcome>,
query_sender: BroadcastSender<HandlerOutput>,
mut query_receiver: BroadcastReceiver<HandlerOutput>
) -> JoinHandle<()> {
let session = Arc::clone(&self.session);
let reader = Arc::clone(&self._reader);
let writer = Arc::clone(&self._writer);
let warden_crypt = Arc::clone(&self._warden_crypt);
let client_state = Arc::clone(&self._state);
tokio::spawn(async move {
loop {
let result = query_receiver.recv().await;
match result {
Ok(output) => {
let session_key = {
let guard = session.lock().unwrap();
guard.session_key.clone()
};
match output {
HandlerOutput::Data((opcode, packet, json)) => {
let packet = match opcode {
Opcode::CMSG_WARDEN_DATA => {
let header = &packet[..OUTCOMING_HEADER_LENGTH];
let body = warden_crypt.lock()
.unwrap().as_mut()
.unwrap().encrypt(&packet[OUTCOMING_HEADER_LENGTH..]);
[header.to_vec(), body.to_vec()].concat()
},
_ => packet,
};
output_sender.send((opcode, packet, json)).await.unwrap();
},
HandlerOutput::ConnectionRequest(host, port) => {
match Self::connect_inner(&host, port).await {
Ok(stream) => {
signal_sender.send(Signal::Reconnect).await.unwrap();
Self::set_stream_halves(
stream,
Arc::clone(&reader),
Arc::clone(&writer),
session_key.clone(),
Arc::clone(&warden_crypt),
).await;
query_sender.send(
HandlerOutput::SuccessMessage(
format!("Connected to {}:{}", host, port),
None
)
).unwrap();
client_state.flags.lock().unwrap().set(
ClientFlags::IS_CONNECTED_TO_REALM,
true,
);
},
Err(err) => {
query_sender.send(
HandlerOutput::ErrorMessage(err.to_string(), None)
).unwrap();
}
}
},
HandlerOutput::Drop => {
break;
},
HandlerOutput::SelectRealm(realm) => {
session.lock().unwrap().selected_realm = Some(realm);
{
client_state.flags.lock().unwrap().set(
ClientFlags::IN_FROZEN_MODE,
false
);
client_state.condvar.notify_all();
}
},
_ => {},
};
},
Err(err) => {
query_sender.send(HandlerOutput::ErrorMessage(err.to_string(), None)).unwrap();
},
};
}
})
}
And also when I try to send smth from UI, receiver in handle_output receives nothing.
I have a feeling like only one receiver at a time receives a message from any sender.
Could you guys please help me to find a correct way to have any amount of receivers and senders that will be connected with each other ?