Need help to fix the deadlock

Hi everyone !

In my app I faced with deadlock. My app is a game client and it's connect to some game server. I want to redirect the packet I read to my external logging server. So, I added extra queue mpsc::channel::<IncomingPacket>(BUFFER_SIZE) and use it to redirect all incoming packets to the handle_external_write task. My app connects to the login server and after auth process it connects to the world server.
When client reconnected, it replace its TcpStream with new one. For this purpose I use Self::set_stream_halves method:

async fn set_stream_halves(
        stream: TcpStream,
        reader: Arc<Mutex<Option<Reader>>>,
        writer: Arc<Mutex<Option<Writer>>>,
        session_key: Option<Vec<u8>>,
        warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
    ) {
        let (rx, tx) = stream.into_split();

        match session_key {
            Some(session_key) => {
                *warden_crypt.lock().unwrap() = Some(WardenCrypt::new(&session_key));
                // HERE IS DEADLOCK
                *reader.lock().await = Some(
                    Reader::new(
                        rx,
                        Arc::clone(&warden_crypt),
                        true,
                        Some(Decryptor::new(&session_key)),
                    )
                );

                *writer.lock().await = Some(
                    Writer::new(
                        tx,
                        Arc::clone(&warden_crypt),
                        true,
                        Some(Encryptor::new(&session_key)),
                    )
                );
            }
            None => {
                *reader.lock().await = Some(
                    Reader::new(rx, Arc::new(SyncMutex::new(None)), false, None)
                );
                *writer.lock().await = Some(
                    Writer::new(tx, Arc::new(SyncMutex::new(None)), false, None)
                );
            }
        }
    }

the deadlock appears after I added this line to the handle_read:

// this is the queue-sender for redirecting incoming packets to the handle_write_external task
incoming_sender.send(packet).await?;

and after I added the handle_write_external task. Without this extra queue and without this extra task all works as expected.

In the handle_write_external the incoming_receiver hangs on .await since there nothing was sent yet:

fn handle_external_write(
        &mut self,
        mut incoming_receiver: Receiver<IncomingPacket>,
        mut signal_receiver: BroadcastReceiver<Signal>,
    ) -> Task {
        tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = signal_receiver.recv() => {
                        break;
                    },
                    _ = incoming_receiver.recv() => {},
                }
            }

            let mut stream = loop {
                match TcpStream::connect("127.0.0.1:3788").await {
                    Ok(stream) => {
                        break stream;
                    }
                    Err(_) => {
                        tokio::time::sleep(Duration::from_secs(10)).await;
                    }
                }
            };

            // HANGS HERE
            while let Some(incoming_packet) = incoming_receiver.recv().await {
                let IncomingPacket { header: mut packet, body, .. } = incoming_packet;
                packet.extend_from_slice(&body);
                stream.write(&packet).await?;
            }

            Ok(())
        })
    }

This is the full example:

pub struct Client {
    _reader: Arc<Mutex<Option<Reader>>>,
    _writer: Arc<Mutex<Option<Writer>>>,
    _warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,

    session: Arc<Mutex<Session>>,
    data_storage: Arc<SyncMutex<DataStorage>>,
}

impl Client {
    pub fn new(options: CreateOptions) -> Self {
        Self {
            _reader: Arc::new(Mutex::new(None)),
            _writer: Arc::new(Mutex::new(None)),
            _warden_crypt: Arc::new(SyncMutex::new(None)),

            session: Arc::new(Mutex::new(Session::new())),
            data_storage: options.data_storage
                .unwrap_or_else(|| Arc::new(SyncMutex::new(DataStorage::default()))),
        }
    }

    async fn connect_inner(host: &str, port: u16) -> Result<TcpStream, Error> {
        let addr = format!("{}:{}", host, port);
        match TcpStream::connect(&addr).await {
            Ok(stream) => Ok(stream),
            Err(err) => Err(err),
        }
    }

    async fn set_stream_halves(
        stream: TcpStream,
        reader: Arc<Mutex<Option<Reader>>>,
        writer: Arc<Mutex<Option<Writer>>>,
        session_key: Option<Vec<u8>>,
        warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
    ) {
        let (rx, tx) = stream.into_split();

        match session_key {
            Some(session_key) => {
                *warden_crypt.lock().unwrap() = Some(WardenCrypt::new(&session_key));

                // THIS MUTEX IS LOCKED AFTER ConnectionRequest in handle_output task
                *reader.lock().await = Some(
                    Reader::new(
                        rx,
                        Arc::clone(&warden_crypt),
                        true,
                        Some(Decryptor::new(&session_key)),
                    )
                );

                *writer.lock().await = Some(
                    Writer::new(
                        tx,
                        Arc::clone(&warden_crypt),
                        true,
                        Some(Encryptor::new(&session_key)),
                    )
                );
            }
            None => {
                *reader.lock().await = Some(
                    Reader::new(rx, Arc::new(SyncMutex::new(None)), false, None)
                );
                *writer.lock().await = Some(
                    Writer::new(tx, Arc::new(SyncMutex::new(None)), false, None)
                );
            }
        }
    }

    pub async fn run(&mut self, options: RunOptions<'_>) -> anyhow::Result<()> {
        let RunOptions { account, config_path, dotenv_path, external_features } = options;
        let EnvConfig { host, port } = EnvConfig::new(
            EnvConfigParams { dotenv_path }
        )?;

        let notify = Arc::new(Notify::new());

        let (signal_sender, signal_receiver) = broadcast::<Signal>(1);
        let (output_sender, output_receiver) = mpsc::channel::<OutgoingPacket>(BUFFER_SIZE);
        // this queue I use to redirect the packets from handle_read to my logging server
        let (incoming_sender, incoming_receiver) = mpsc::channel::<IncomingPacket>(BUFFER_SIZE);
        let (query_sender, query_receiver) = broadcast::<HandlerOutput>(BUFFER_SIZE);

        match Self::connect_inner(&host, port).await {
            Ok(stream) => {
                Self::set_stream_halves(
                    stream,
                    Arc::clone(&self._reader),
                    Arc::clone(&self._writer),
                    None,
                    Arc::clone(&self._warden_crypt),
                ).await;

                self.session.lock().await.set_config(&host, account, config_path)?;

                query_sender.broadcast(
                    HandlerOutput::SuccessMessage(
                        format!("Connected to {}:{}", host, port),
                        None,
                    )
                ).await?;

                Ok(())
            }
            Err(err) => {
                query_sender.broadcast(
                    HandlerOutput::ErrorMessage(format!("Cannot connect: {}", err), None)
                ).await?;

                Err(err)
            }
        }?;

        #[allow(unused_mut)]
        let mut features: Vec<Box<dyn Feature>> = external_features;
        // ...

        for feature in &mut features {
            feature.set_broadcast_channel(query_sender.clone(), query_receiver.clone());
        }

        let mut all_tasks = vec![];

        for feature in features.iter_mut() {
            all_tasks.extend(feature.get_tasks()?);
        }

        all_tasks.extend(vec![
            self.handle_read(
                signal_receiver.clone(),
                query_sender.clone(),
                incoming_sender.clone(),
                notify.clone(),
                features,
            ),
            self.handle_output(
                signal_sender.clone(),
                output_sender.clone(),
                query_sender.clone(),
                query_receiver,
                notify.clone(),
            ),
            self.handle_write(
                output_receiver,
                query_sender,
            ),
            self.handle_external_write(
                incoming_receiver,
                signal_receiver,
            ),
        ]);

        join_all(all_tasks).await;

        Ok(())
    }

    fn handle_read(
        &mut self,
        mut signal_receiver: BroadcastReceiver<Signal>,
        query_sender: BroadcastSender<HandlerOutput>,
        incoming_sender: Sender<IncomingPacket>,
        notify: Arc<Notify>,
        features: Vec<Box<dyn Feature>>,
    ) -> Task {
        let reader = Arc::clone(&self._reader);
        let session = Arc::clone(&self.session);
        let data_storage = Arc::clone(&self.data_storage);

        tokio::spawn(async move {
            let mut realm_processors = vec![];
            let mut processors = vec![];
            let mut one_time_handler_maps = vec![];
            let mut initial_processors = vec![];

            for feature in features.into_iter() {
                realm_processors.extend(feature.get_realm_processors());
                processors.extend(feature.get_login_processors());
                one_time_handler_maps.extend(feature.get_one_time_handler_maps());
                initial_processors.extend(feature.get_initial_processors());
            }

            let mut realm_processors = Some(realm_processors);

            let handler_list = initial_processors
                .iter()
                .flat_map(|processor| processor(Opcode::LOGIN_CHALLENGE as u16))
                .collect::<ProcessorResult>();

            Self::call_handlers(
                handler_list, &query_sender, &notify, HandlerInput {
                    session: Arc::clone(&session),
                    data: vec![],
                    data_storage: Arc::clone(&data_storage),
                    opcode: Opcode::LOGIN_CHALLENGE as u16,
                },
            ).await?;

            loop {
                tokio::select! {
                    _ = signal_receiver.recv() => {
                        processors = realm_processors.take().unwrap();
                    },
                    result = Self::read_packet(reader.clone()) => {
                        match result {
                            Ok(packet) => {
                                let IncomingPacket { opcode, body, .. } = packet.clone();

                                let input = HandlerInput {
                                    session: Arc::clone(&session),
                                    data: body,
                                    data_storage: Arc::clone(&data_storage),
                                    opcode,
                                };

                                let mut handler_list = processors
                                    .iter()
                                    .flat_map(|processor| processor(opcode))
                                    .collect::<ProcessorResult>();

                                for map in one_time_handler_maps.iter_mut() {
                                    if let Some(handlers) = map.remove(&opcode) {
                                        handler_list.extend(handlers)
                                    }
                                }

                                // in this case we just show the raw packet (hex)
                                if handler_list.is_empty() {
                                    let opcode_name = Opcode::get_opcode_name(
                                        input.opcode as u32
                                    ).unwrap_or(format!("Unknown opcode: {}", input.opcode));

                                    query_sender.broadcast(HandlerOutput::ResponseMessage(
                                        opcode_name,
                                        Some(encode_hex(&input.data)),
                                    )).await?;
                                }

                                Self::call_handlers(
                                    handler_list, &query_sender, &notify, input
                                ).await?;

                                incoming_sender.send(packet).await?;
                            },
                            Err(err) => {
                                // ...
                            }
                        }
                    },
                }
            }
        })
    }

    fn handle_output(
        &mut self,
        signal_sender: BroadcastSender<Signal>,
        output_sender: Sender<OutgoingPacket>,
        query_sender: BroadcastSender<HandlerOutput>,
        mut query_receiver: BroadcastReceiver<HandlerOutput>,
        notify: Arc<Notify>,
    ) -> Task {
        let session = Arc::clone(&self.session);
        let reader = Arc::clone(&self._reader);
        let writer = Arc::clone(&self._writer);
        let warden_crypt = Arc::clone(&self._warden_crypt);

        tokio::spawn(async move {
            loop {
                let result = query_receiver.recv().await;
                match result {
                    Ok(output) => {
                        match output {
                            // ...
                            HandlerOutput::ConnectionRequest(host, port) => {
                                match Self::connect_inner(&host, port).await {
                                    Ok(stream) => {
                                        signal_sender.broadcast(Signal::Reconnect).await?;

                                        let session_key = {
                                            let guard = session.lock().await;
                                            let srp = guard.srp.as_ref().unwrap();
                                            srp.session_key.to_vec()
                                        };

                                        // THIS TASK HANGS HERE
                                        Self::set_stream_halves(
                                            stream,
                                            Arc::clone(&reader),
                                            Arc::clone(&writer),
                                            Some(session_key.clone()),
                                            Arc::clone(&warden_crypt),
                                        ).await;

                                        // ...
                                    }
                                    Err(err) => {
                                        // ...
                                    }
                                }
                            }
                            // ...
                            _ => {}
                        };
                    }
                    Err(err) => {
                        // ...
                    }
                };
            }

            Ok(())
        })
    }

    fn handle_external_write(
        &mut self,
        mut incoming_receiver: Receiver<IncomingPacket>,
        mut signal_receiver: BroadcastReceiver<Signal>,
    ) -> Task {
        tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = signal_receiver.recv() => {
                        break;
                    },
                    _ = incoming_receiver.recv() => {},
                }
            }

            let mut stream = loop {
                match TcpStream::connect("127.0.0.1:3788").await {
                    Ok(stream) => {
                        break stream;
                    }
                    Err(_) => {
                        tokio::time::sleep(Duration::from_secs(10)).await;
                    }
                }
            };

            // THIS TASK HANGS HERE
            while let Some(incoming_packet) = incoming_receiver.recv().await {
                let IncomingPacket { header: mut packet, body, .. } = incoming_packet;
                packet.extend_from_slice(&body);
                stream.write(&packet).await?;
            }

            Ok(())
        })
    }

    async fn read_packet(reader: Arc<Mutex<Option<Reader>>>) -> anyhow::Result<IncomingPacket> {
        reader.lock().await.as_mut().unwrap().read().await
    }

    async fn write_packet(
        writer: Arc<Mutex<Option<Writer>>>,
        packet: &mut OutgoingPacket,
    ) -> anyhow::Result<usize> {
        writer.lock().await.as_mut().unwrap().write(packet).await
    }
}

Could somebody explain what I did wrong ?

Hi @Anachoreta. You say the handle_output task hangs in set_stream_halves; can you add debug statements and find out on which lock? The incoming_receiver in handle_external_write is not receiving any messages, and the only place messages are sent into incoming_sender is the handle_read task. Does that task hang, and where?

Hi @maarten, thank you for your reply. Could you please provide me with more details which debug statements can I share ? Regarding your question about the lock, the lock is on reader mutex. It happens inside handle_output task, here:

HandlerOutput::ConnectionRequest(host, port) => {
    match Self::connect_inner(&host, port).await {
        Ok(stream) => {
            signal_sender.broadcast(Signal::Reconnect).await?;

            let session_key = {
                let guard = session.lock().await;
                let srp = guard.srp.as_ref().unwrap();
                srp.session_key.to_vec()
            };

            // THE DEADLOCK IS HERE
            Self::set_stream_halves(
                stream,
                Arc::clone(&reader),
                Arc::clone(&writer),
                Some(session_key.clone()),
                Arc::clone(&warden_crypt),
            ).await;

            query_sender.broadcast(
                HandlerOutput::SuccessMessage(
                    format!("Connected to {}:{}", host, port),
                    None,
                )
            ).await?;
        }
        Err(err) => {
            query_sender.broadcast(
                HandlerOutput::ErrorMessage(err.to_string(), None)
            ).await?;
        }
    }
}

and the place in the Self::set_stream_halves where deadlock is:

async fn set_stream_halves(
        stream: TcpStream,
        reader: Arc<Mutex<Option<Reader>>>,
        writer: Arc<Mutex<Option<Writer>>>,
        session_key: Option<Vec<u8>>,
        warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
    ) {
        let (rx, tx) = stream.into_split();

        match session_key {
            Some(session_key) => {
                *warden_crypt.lock().unwrap() = Some(WardenCrypt::new(&session_key));
                // DEADLOCK IS HERE
                *reader.lock().await = Some(
                    Reader::new(
                        rx,
                        Arc::clone(&warden_crypt),
                        true,
                        Some(Decryptor::new(&session_key)),
                    )
                );

                *writer.lock().await = Some(
                    Writer::new(
                        tx,
                        Arc::clone(&warden_crypt),
                        true,
                        Some(Encryptor::new(&session_key)),
                    )
                );
            }
            None => {
                *reader.lock().await = Some(
                    Reader::new(rx, Arc::new(SyncMutex::new(None)), false, None)
                );
                *writer.lock().await = Some(
                    Writer::new(tx, Arc::new(SyncMutex::new(None)), false, None)
                );
            }
        }
    }

I marked the places with the comment. For me it seems like the reader is blocked somehow by the incoming_sender.send(packet).await?; or receiver which waits for it.
I did some experiments with the receiver, for example, in the handle_external_write I added the sleep and this changed the situation a bit:

fn handle_external_write(
        &mut self,
        mut incoming_receiver: Receiver<IncomingPacket>,
        mut signal_receiver: BroadcastReceiver<Signal>,
    ) -> Task {
        tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = signal_receiver.recv() => {
                        break;
                    },
                    _ = incoming_receiver.recv() => {
                        // THE TIMER WAS ADDED HERE
                        tokio::time::sleep(Duration::from_secs(1)).await;
                    },
                }
            }

            let mut stream = loop {
                match TcpStream::connect("127.0.0.1:3788").await {
                    Ok(stream) => {
                        break stream;
                    }
                    Err(_) => {
                        tokio::time::sleep(Duration::from_secs(10)).await;
                    }
                }
            };

            while let Some(incoming_packet) = incoming_receiver.recv().await {
                let IncomingPacket { header: mut packet, body, .. } = incoming_packet;
                packet.extend_from_slice(&body);
                stream.write(&packet).await?;
            }

            Ok(())
        

What I do in the task above: I ignore every packet I receive with incoming_receiver until the reconnection signal appear. Then I connect to my local server/or wait if it not available until it will be available.

Right, I had missed that because the comment for the exact location in set_stream_halves is not in your full code listing.

So handle_output is blocking forever while trying to lock the mutex for the reader. The only other location this mutex is locked (in the code you gave) is in handle_read, which calls the read_packet function. Could handle_read somehow be stuck there? Or else, what is that task doing when your program hangs?

1 Like

Thank you for your questions, this helped me to look at the problem from the other side. Problem was in handle_read and it seems because of loop + tokio::select! there was no chance for the runtime to switch the control to the handle_output (because reader is always locked). I cannot say 100% why that worked before incoming_sender was added (probably the code run too fast and because of that there was no race condition).

So, as fix, I added the sleep to the tokio::select! where signal_receiver.recv() is processed:

fn handle_read(
        &mut self,
        mut signal_receiver: BroadcastReceiver<Signal>,
        query_sender: BroadcastSender<HandlerOutput>,
        incoming_sender: Sender<IncomingPacket>,
        notify: Arc<Notify>,
        features: Vec<Box<dyn Feature>>,
    ) -> Task {
        let reader = Arc::clone(&self._reader);
        let session = Arc::clone(&self.session);
        let data_storage = Arc::clone(&self.data_storage);

        tokio::spawn(async move {
            // ...

            loop {
                tokio::select! {
                    _ = signal_receiver.recv() => {
                        processors = realm_processors.take().unwrap();
                        // I ADDED THIS TIMEOUT
                        tokio::time::sleep(Duration::from_millis(1)).await;
                    },
                    result = Self::read_packet(reader.clone()) => {
                        match result {
                            Ok(packet) => {
                                let IncomingPacket { opcode, body, header } = packet.clone();

                                let input = HandlerInput {
                                    session: Arc::clone(&session),
                                    data: body,
                                    data_storage: Arc::clone(&data_storage),
                                    opcode,
                                };

                                let mut handler_list = processors
                                    .iter()
                                    .flat_map(|processor| processor(opcode))
                                    .collect::<ProcessorResult>();

                                for map in one_time_handler_maps.iter_mut() {
                                    if let Some(handlers) = map.remove(&opcode) {
                                        handler_list.extend(handlers)
                                    }
                                }

                                // if true then there no handlers defined for the current opcode
                                // in this case we just show the raw packet (hex)
                                if handler_list.is_empty() {
                                    let opcode_name = Opcode::get_opcode_name(
                                        input.opcode as u32
                                    ).unwrap_or(format!("Unknown opcode: {}", input.opcode));

                                    query_sender.broadcast(HandlerOutput::ResponseMessage(
                                        opcode_name,
                                        Some(encode_hex(&input.data)),
                                    )).await?;
                                }

                                Self::call_handlers(
                                    handler_list, &query_sender, &notify, input
                                ).await?;

                                incoming_sender.send(packet).await?;
                            },
                            Err(err) => {
                                // ...
                            }
                        }
                    },
                }
            }
        })
    }

The question is: how good this fix is. It works, it helped, but how good is it.

Congrats for finding the problem. As you said, probably not yet a robust solution though.

Without having read and understood the code thoroughly, it strikes me as odd that the handle_read task would be reading from a stream that you're attempting to replace after a reconnect. Isn't the point of reconnecting to restore connection after the original connection got lost? Shouldn't the read task be in some sort of error mode at that point, e.g. by disabling the read_packet branch of the select?

Alternatively, if somehow you do need to be reading from the original stream while it gets replaced, it could be placed in a Mutex<Arc<_>>, so that you can block on read from the "lastest" stream without holding on to the lock, until you get the signal -- which should then probably be sent after the reader stream has been replaced. That is a pattern that seems to be well encapsulated by tokio::sync::watch::{Sender, Receiver}.

1 Like

Well, this is the part of the my console wow-client (tentacli). It connects to the login server - authenticates there and then connects to the world server. That's why I replace the connection.

The full code is here: tentacli/src/primary/client.rs at IC-144 · idewave/tentacli · GitHub

The code discussed in this topic is the part of relay feature, which allows to relay packets from the game server to which my wow-client connects to my special server: GitHub - idewave/tine: Tine is not emulator, it is just a tiny wow server for v3.3.5a

So next step I can do is to connect to this special server to see all relayed packets as part of the game process. Or in the other words, to see how tentacli see the world, what's going on around it.

That is a pattern that seems to be well encapsulated by tokio::sync::watch::{Sender, Receiver}.

Do you mean, to replace my signal_receiver with watch ?

I am not entirely sure what else the signal_receiver is supposed to do here, but i notice it interrupts the read and solves your deadlock if the handler takes long enough (sleeping a few milliseconds). That means a signal is sent ( in handle handle_output) right before the read stream is replaced and we are supposed to pick up the new one. It would be preferable to be notified after the stream has been replaced, so we are guaranteed to find the new one. tokio::sync::watch ties together the update signal and the value. You'd still need the reader in an Arc to avoid blocking the watch channel while reading.

Something like this (untested):

let (reader_sender, reader_receiver) = tokio::sync::watch::channel(Arc::new(initial_reader));

let handle_read = spawn(async move {
   loop {
      select! {
         r = reader_receiver.changed() => {
            r.expect("reader watcher closed unexpectedly");
         },
         packet = reader_receiver.borrow().clone().read() => {
            // ...
         }
      }
   }
});

let handle_output = spawn(async move {
   while let Some(new_reader) = get_new_reader().await {
      reader_sender.send(Arc::new(new_reader)).expect("failed to send updated reader");
   }
});