Tokio mutex never unlocked, seems like locked from another thread

In my TCP client I have multiple threads to handle response, for non-blocking I use TcpStream splitted into separate reader and writer. On some condition I need to override reader and writer by calling connect again, but seems like reader.lock().await and writer.lock().await never unlocked, since I locked it previously from another thread (handle_read and handle_write methods).

This is my code:

pub struct Reader {
    stream: OwnedReadHalf,
    // ... rest fields
}

pub struct Writer {
    stream: OwnedWriteHalf,
    // ... rest fields
}

pub struct Client {
    reader: Arc<Mutex<Option<Reader>>>,
    writer: Arc<Mutex<Option<Writer>>>,
    // ... rest fields
}

impl Client {
    pub async fn connect(&mut self, host: &str, port: u16) {
        let stream = Self::connect_inner(host, port).await;
        let (rx, tx) = stream.into_split();

        let mut reader = self.reader.lock().await;
        *reader = Some(Reader::new(rx));
        let mut writer = self.writer.lock().await;
        *writer = Some(Writer::new(tx));
    }

    async fn connect_inner(host: &str, port: u16) -> TcpStream {
        let addr = format!("{}:{}", host, port);
        match TcpStream::connect(&addr).await {
            Ok(stream) => {
                println!("Connected to {}", addr);
                stream
            },
            _ => {
                panic!("Cannot connect");
            },
        }
    }

    pub async fn handle_connection(&mut self) {
        loop {
            timeout(Duration::from_millis(TIMEOUT), self.handle_read()).await;
            self.handle_queue().await;
            timeout(Duration::from_millis(TIMEOUT), self.handle_write()).await;
        }
    }

    async fn handle_queue(&mut self) {
        let reader = Arc::clone(&self.reader);
        let writer = Arc::clone(&self.writer);

        tokio::spawn(async move {
            // ... rest code
           let stream = Self::connect_inner(&host, port).await;
           let (rx, tx) = stream.into_split();

           // this await never finished
           let mut reader = reader.lock().await;
           *reader = Some(Reader::new(rx));
           let mut writer = writer.lock().await;
           *writer = Some(Writer::new(tx));
           // ... rest code
        }).await.unwrap()
    }

    async fn handle_read(&mut self) {
        let reader = Arc::clone(&self.reader);

        tokio::spawn(async move {
            let lock = reader.lock();

            match &mut *lock.await {
                Some(reader) => {
                    let raw_data = reader.read().await.unwrap();
                    // ... rest code
                },
                _ => {},
            }
        }).await.unwrap()
    }

    async fn handle_write(&mut self) {
        let writer = Arc::clone(&self.writer);
        // ... rest code

        tokio::spawn(async move {
            match output_queue.lock().await.pop_front() {
                Some(packet) => {
                    if !packet.is_empty() {
                        let lock = writer.lock();
                        match &mut *lock.await {
                            Some(writer) => {
                                writer.write(&packet).await;
                            },
                            _ => {},
                        }
                    }
                },
                _ => {},
            }
        }).await.unwrap()
    }
}

First idea how to fix was to set read/write timeout, but seems like tokio has no set_timeout method on stream, so I tried to use tokio::time::timeout instead (inside handle_connection method), but this not helped. The reader/writer lock() never return result from await:

async fn handle_queue(&mut self) {
    let reader = Arc::clone(&self.reader);
    let writer = Arc::clone(&self.writer);

    tokio::spawn(async move {
        // ... rest code
       let stream = Self::connect_inner(&host, port).await;
       let (rx, tx) = stream.into_split();

       // this await never finished
       let mut reader = reader.lock().await;
       *reader = Some(Reader::new(rx));
       let mut writer = writer.lock().await;
       *writer = Some(Writer::new(tx));
       // ... rest code
    }).await.unwrap()
}

Also I tried to drop mutex:

let lock = reader.lock();
drop(lock);

but this also didn't help.

Could somebody explain, why the issue happened and how to fix it ?

I'm not sure exactly what's happening with the Mutex, perhaps I am missing some context, but you're doing a lot of tokio::spawn(...).await which usually doesn't make sense. Spawning is intended to run a future concurrently, if you immediately await the result, you don't get any concurrency with it, and you may just as well write the code inline, without the spawn.

3 Likes

when I omit await there, I found that thread spawn eat all memory. Seems like unlimited thread spawn in loop.

You should omit both the spawn call and await.

could you clarify what do you mean ? without spawn how then I will create new thread ?

You're not creating a new thread currently. You create a new task and suspend the current one until the created task is finished, and this is semantically the same as just run the code inside the spawn directly.

2 Likes

anyway, I should wait until previous task will finish before next will start. For example, handle_read and handle_right can do nothing for some period, if I will start new read/write task, there will be multiple read/write tasks that do nothing. So, await still not OK in this case ? If so, could you advice what better to use here ?

So don't create the task. Run its logic right here.

I need to read and write packets in differents threads because I need them not blocking by other tasks.

I feel I do not need tokio for this purpose.

Then don't await the task. Spawn it and let it run to completion. If you need something to signal the completion, make it capture the sender side of a oneshot channel.

If the task itself is synchronous, you don't, you can just drop all the async and thread::spawn everything.

3 Likes

well, seems like some of last updates fixed that and after I removed .await.unwrap() from tokio::spawn, there no memory leaks anymore. But I still cannot override reader and writer, seems like Mutex always locked.

To clarify: the issue for which I started the topic is not solved for now.

What is output_queue in handle_write ?

Both input_queue and output_queue are:

input_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
output_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,

The bug is not obvious in the code you have posted, is it possible to share more context?

1 Like

Sure. The issue is that reader instance on each iteration in loop locked by itself:

impl Reader {
    pub fn new(reader: OwnedReadHalf) -> Self {
        Self {
            stream: reader,
            decryptor: None,
        }
    }

    pub fn init(&mut self, session_key: &[u8]) {
        self.decryptor = Some(Decryptor::new(session_key));
    }

    pub async fn read(&mut self) -> Result<Vec<u8>, Error> {
        let mut buffer = [0u8; 4096];

        // locked here
        if let Ok(bytes_count) = self.stream.read(&mut buffer).await {
            let raw_data = match self.decryptor.as_mut() {
                Some(decryptor) => decryptor.decrypt(&buffer[..bytes_count]),
                _ => buffer[..bytes_count].to_vec(),
            };

            return Ok(raw_data);
        }

        Err(Error::new(ErrorKind::NotFound, "No data read"))
    }
}

and this is how I use the reader:

async fn handle_read(&mut self) {
        let reader = Arc::clone(&self.reader);

        tokio::spawn(async move {
            let lock = reader.lock();

            match &mut *lock.await {
                Some(reader) => {
                    let raw_data = reader.read().await.unwrap();
                    // ... rest code
                },
                _ => {},
            }
        }).await.unwrap()
    }

since stream.read waits until it got something it seems like it always locked. But I cannot use try_read, because in my case it will not read data from server.

So, when I try to lock reader from another thread I cannot do it:

async fn handle_queue(&mut self) {
    let reader = Arc::clone(&self.reader);
    let writer = Arc::clone(&self.writer);

    tokio::spawn(async move {
        // ... rest code
       let stream = Self::connect_inner(&host, port).await;
       let (rx, tx) = stream.into_split();

       // cannot lock reader, seems like it already locked inside handle_read
       let mut reader = reader.lock().await;
       *reader = Some(Reader::new(rx));
       let mut writer = writer.lock().await;
       *writer = Some(Writer::new(tx));
       // ... rest code
    }).await.unwrap()
}

P.S. I am really sorry, I forgot to include impl Reader in my question before.

Solved ! I added sleep into each thread. Now it works as expected.

Thanks very much for each person who spent time to help me !

sleep helping sounds very suspicious. Your code will probably lock up randomly depending on timing of the sleep relative to other events.

If sleep helps, then it's possible that yield_now in tokio::task - Rust will help too, but faster.

BTW, tokio::spawn does not guarantee that you will get it run on another thread. Even in a multi-threaded runtime tokio::spawn(foo).await may end up being scheduled in a way that makes it identical to foo.await.

10 Likes

so, you advice not to use tokio::spawn for multi-thread purpose ? If so, could you advice what to use instead then ?

Well, if you need a new thread, spawn a new thread, i.e. use std::thread::spawn. If you need something more high-level, there are two de-facto standard options: rayon, when you simply want to parallelize somehow a chunk of byte-crunching, and crossbeam, when you need some more fine-grained control.

2 Likes