Await occurs here with maybe used later when combine tokio mutex with std mutex

Hi Community ! I have an issue with combining std::sync::mutex and tokio::sync::mutex. So, I refactored my app to use std::sync::mutex almost everywhere (except mutex where I store TCPStream reader and writer). And sometimes I got an error like:

error: future cannot be sent between threads safely
   --> src\client\mod.rs:404:9
    |
404 |         tokio::spawn(async move {
    |         ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl futures::Future<Output = [async output]>`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, VecDeque<Vec<u8>>>`
note: future is not `Send` as this value is used across an await
   --> src\client\mod.rs:411:60
    |
408 |                         let mut output = output_queue.lock().unwrap();
    |                             ---------- has type `std::sync::MutexGuard<'_, VecDeque<Vec<u8>>>` which is not `Send`
...
411 |                                 match writer.write(&packet).await {
    |                                                            ^^^^^^ await occurs here, with `mut output` maybe used later
...
423 |                     },
    |                     - `mut output` is later dropped here

From what I understood from this issue, await always should be on top. But not sure. So trying to find common approach of how to fix the issue like this.

Below the code part (one of the parts) where I got an error. I annotated where I use std::sync::mutex or tokio::sync::mutex

async fn handle_write(&mut self) -> JoinHandle<()> {
        let output_queue = Arc::clone(&self._output_queue);
        let writer = Arc::clone(&self._writer);

        // std::sync::mutex
        let income_pipe = self._income_message_pipe.lock().unwrap();
        let mut message_income = income_pipe.message_income.clone();

        tokio::spawn(async move {
            loop {
                // tokio::sync::mutex
                match &mut *writer.lock().await {
                    Some(writer) => {
                        // std::sync::mutex
                        let mut output = output_queue.lock().unwrap();
                        if let Some(packet) = output.pop_front() {
                            if !packet.is_empty() {
                                match writer.write(&packet).await {
                                    Ok(bytes_amount) => {
                                        message_income.send_debug_message(
                                            format!("{} bytes sent", bytes_amount)
                                        );
                                    },
                                    Err(err) => {
                                        message_income.send_error_message(err.to_string());
                                    }
                                };
                            }
                        }
                    },
                    None => {
                        message_income.send_error_message(
                            String::from("Not connected to TCP")
                        );
                    },
                };

                sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
            }
        })
    }

Could somebody explain me how to fix such issues ?

well, finally I refactored code and moved parts with lock() into variables, so now it's OK:

async fn handle_write(&mut self) -> JoinHandle<()> {
	let output_queue = Arc::clone(&self._output_queue);
	let writer = Arc::clone(&self._writer);

	let mut message_income = self._income_message_pipe.lock().unwrap().message_income.clone();

	tokio::spawn(async move {
		loop {
			match &mut *writer.lock().await {
				Some(writer) => {
					let packet = output_queue.lock().unwrap().pop_front();
					if packet.is_some() {
						let packet = packet.unwrap();
						if !packet.is_empty() {
							match writer.write(&packet).await {
								Ok(bytes_amount) => {
									message_income.send_debug_message(
										format!("{} bytes sent", bytes_amount)
									);
								},
								Err(err) => {
									message_income.send_error_message(err.to_string());
								}
							};
						}
					}
				},
				None => {
					message_income.send_error_message(
						String::from("Not connected to TCP")
					);
				},
			};

			sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
		}
	})
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.