Use tokio::sync::Mutex in select! ,cause deallock

(1)There is a task queue that uses Arc<tokio::sync::Mutex>
(2)There is another thread for IO read and write split,where the write thread reads data from the task queue
(3)The IO read-write thread uses select! for composition


when the socket disconnect, the read_task exits first,causing the write_task to be canceled.....However, the lock of Mutex inside the io_write_thread does not seem to be released...



 let (msg_sender, msg_receiver) = tokio::sync::mpsc::channel::<String>(10240);
            

let msg_receiver = Arc::new(tokio::sync::mutex::Mutex::new(msg_receiver)))

let read_task = tokio::spawn(async move {
    read_from_socket(client_reader).await
});

let task_receiver = Arc::clone(&msg_receiver);
let write_task = tokio::spawn(async move {
    write_to_socket(client_writer,task_receiver ).await
});

    tokio::select! {
                e = read_task =>{
                     error!("({}) error:{:?}", bus_addr, e);
                },
                e = write_task =>{
                     error!("({}) error:{:?}", bus_addr, e);
                },
    
            }

 
msg_receiver.lock().await;//deadlock





async fn write_to_socket(mut writer: OwnedWriteHalf, task_recviver:Arc<Mutex<mpsc::Receiver<String>) -> anyhow::Result<()> {

let task_receiver = Arc::clone(&socket_receiver);
let msg_receiver = socket_receiver.lock.await;
loop{
     match msg_receiver.recv().await {
                    None => { return; }
                    Some(d) => { writer.send(d); }
                };
}

}


---------




after socket disconnect,the task_receiver.lock is deallock

let msg_receiver = socket_receiver.lock.await;
loop{

This looks like msg_receiver is a lock guard, so it stays locked for the entire duration of the task (outlives the loop), so you can't do anything else with it.

You shouldn't need locks around channels at all. Delete the mutex?

but the fn recv need &mut ,if I delete the mutex ,how can I use it in Multi-thread…..

I assume you are using the mpsc channel of tokio, which, as its name indicates, can only have a single consumer/receiver.

if you need multiple receivers, you should use other types of channels. in tokio, there's the broadcast channel, which is a M-to-N channel, but it clones the value to all receivers. if you need a channel that behaves a load-balancer, tokio doesn't have such channels, you can check out other crates such as flume and async-channel. or, you can build your own customized channel abstraction, e.g. on top of tokio's existing synchronization primitives such as locks, semaphores, and notifications.

3 Likes

At minimum, you'd need let tmp = receiver.lock(); let msg = tmp.recv(); drop(tmp); that acquires the exclusive lock right before getting each message, and immediately drops the lock after receiving each message, to give other receivers a chance to also receive a message.

But use of a channel inside a Mutex is clunky and inefficient. A channel itself is an equivalent of Arc<Mutex<Vec<Message>>> internally, so you end up with Arc<Mutex<Arc<Mutex<Vec<Message>>>>>. So you need an mpmc channel or a different design, which doesn't need an extra Mutex.