How to split tokio TcpStream to use halves in separate tasks correctly?

Hi Community ! I implemented TCP client using tokio and I got an issue with lifetimes when tried to use ReadHalf and WriteHalf in separate methods:

error[E0726]: implicit elided lifetime not allowed here
   --> src/main.rs:105:61
    |
105 |     async fn handle_write(&mut self, transmitter: Arc<Mutex<WriteHalf>>) {
    |                                                             ^^^^^^^^^- help: indicate the anonymous lifetime: `<'_>`
    |
    = note: assuming a `'static` lifetime...

and this one:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/main.rs:69:36
   |
69 |     pub async fn handle_connection(&mut self) {
   |                                    ^^^^^^^^^ this data with an anonymous lifetime `'_`...
...
80 |                 self.handle_read(Arc::clone(&receiver))
   |                                             --------- ...is used and required to live as long as `'static` here

My code is next:

use std::collections::VecDeque;
use std::time::Duration;
use std::sync::{Arc};

use tokio::net::{
    TcpStream,
    tcp::{
        ReadHalf,
        WriteHalf
    }
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::time::timeout;
use tokio::sync::{Mutex};

const TIMEOUT: u64 = 250;

pub struct HeaderCrypt {}
impl HeaderCrypt {
    pub fn encrypt(&mut self, _data: &[u8]) -> Vec<u8> {
        return vec![];
    }
    pub fn decrypt(&mut self, _data: &[u8]) -> Vec<u8> {
        return vec![];
    }
}

pub struct Session {
    pub header_crypt: Option<HeaderCrypt>,
}
impl Session {
    pub fn new() -> Self {
        Self {
            header_crypt: None,
        }
    }
}

pub struct Client {
    stream: Option<TcpStream>,
    input_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
    output_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
    session: Arc<Mutex<Session>>,
}

impl Client {
    pub fn new() -> Self {
        Self {
            stream: None,
            input_queue: Arc::new(Mutex::new(VecDeque::new())),
            output_queue: Arc::new(Mutex::new(VecDeque::new())),
            session: Arc::new(Mutex::new(Session::new())),
        }
    }

    pub async fn connect(&mut self, host: &str, port: i16) {
        let addr = format!("{}:{}", host, port);
        match TcpStream::connect(&addr).await {
            Ok(stream) => {
                self.stream = Some(stream);
                println!("Connected to {}", addr);
            },
            _ => {
                panic!("Cannot connect");
            },
        }
    }

    pub async fn handle_connection(&mut self) {
        let packet = vec![1, 2, 3, 4, 5];
        self.output_queue.lock().await.push_back(packet);

        let (rx, tx) = self.stream.as_mut().unwrap().split();
        let receiver = Arc::new(Mutex::new(rx));
        let transmitter = Arc::new(Mutex::new(tx));

        loop {
            timeout(
                Duration::from_millis(TIMEOUT),
                self.handle_read(Arc::clone(&receiver))
            ).await;

            self.handle_queue().await;

            timeout(
                Duration::from_millis(TIMEOUT),
                self.handle_write(Arc::clone(&transmitter))
            ).await;
        }
    }

    async fn handle_queue(&mut self) {
        let input_queue = Arc::clone(&self.input_queue);

        tokio::spawn(async move {
            match input_queue.lock().await.pop_front() {
                Some(packet) if !packet.is_empty() => {
                    println!("PACKET: {:?}", packet);
                },
                _ => {},
            }
        }).await.unwrap()
    }

    async fn handle_write(&mut self, transmitter: Arc<Mutex<WriteHalf>>) {
        let session = Arc::clone(&self.session);
        let output_queue = Arc::clone(&self.output_queue);

        tokio::spawn(async move {
            let packet = match output_queue.lock().await.pop_front() {
                Some(packet) => match session.lock().await.header_crypt.as_mut() {
                    Some(header_crypt) => header_crypt.encrypt(&packet),
                    _ => packet,
                },
                _ => vec![],
            };

            if !packet.is_empty() {
                println!("PACKET: {:?}", &packet);
                let mut lock = transmitter.lock().await;
                lock.write(&packet).await.unwrap();
                lock.flush().await.unwrap();
            }
        }).await.unwrap()
    }

    async fn handle_read(&mut self, receiver: Arc<Mutex<ReadHalf>>) {
        let queue = Arc::clone(&self.input_queue);
        let session = Arc::clone(&self.session);

        tokio::spawn(async move {
            let mut buffer = [0u8; 4096];

            let mut lock = receiver.lock().await;

            match lock.read(&mut buffer).await {
                Ok(bytes_count) => {
                    println!("{}", &bytes_count);
                    let raw_data = match session.lock().await.header_crypt.as_mut() {
                        Some(header_crypt) => header_crypt.decrypt(&buffer[..bytes_count]),
                        _ => buffer[..bytes_count].to_vec(),
                    };

                    queue.lock().await.push_back(raw_data);
                },
                _ => {},
            };
        }).await.unwrap()
    }
}

what I am trying to achieve: I need make my app to be possible to read packets from server in separate thread and put them into input_queue in same order they come from server (to be possible to decrypt them). Once some packet added to the input_queue, I need to process it in separate thread (without blocking I/O) and push result into output_queue. Once packet appear in output_queue, I need to write it back to the server in separate thread.

Maybe I do not need so much threads, I just need to make sure that read and write are not blocked by another parts of code.

The playground with implementation.

Could somebody help me to fix ?

It seems to compile with OwnedReadHalf and OwnedWriteHalf but I didn't run the code!

use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::time::timeout;

const TIMEOUT: u64 = 250;

pub struct HeaderCrypt {}
impl HeaderCrypt {
    pub fn encrypt(&mut self, _data: &[u8]) -> Vec<u8> {
        vec![]
    }
    pub fn decrypt(&mut self, _data: &[u8]) -> Vec<u8> {
        vec![]
    }
}

#[derive(Default)]
pub struct Session {
    pub header_crypt: Option<HeaderCrypt>,
}
impl Session {
    pub fn new() -> Self {
        Self::default()
    }
}

#[derive(Default)]
pub struct Client {
    reader: Arc<Mutex<Option<OwnedReadHalf>>>,
    writer: Arc<Mutex<Option<OwnedWriteHalf>>>,
    input_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
    output_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
    session: Arc<Mutex<Session>>,
}

impl Client {
    pub fn new() -> Self {
        Self::default()
    }

    pub async fn connect(&mut self, host: &str, port: i16) {
        let addr = format!("{}:{}", host, port);
        match TcpStream::connect(&addr).await {
            Ok(stream) => {
                let (reader, writer) = stream.into_split();
                self.reader = Arc::new(Mutex::new(Some(reader)));
                self.writer = Arc::new(Mutex::new(Some(writer)));
                println!("Connected to {}", addr);
            }
            _ => {
                panic!("Cannot connect");
            }
        }
    }

    pub async fn handle_connection(&mut self) {
        let packet = vec![1, 2, 3, 4, 5];
        self.output_queue.lock().await.push_back(packet);

        loop {
            timeout(Duration::from_millis(TIMEOUT), self.handle_read())
                .await
                .unwrap();
            timeout(Duration::from_millis(TIMEOUT), self.handle_queue())
                .await
                .unwrap();
            timeout(Duration::from_millis(TIMEOUT), self.handle_write())
                .await
                .unwrap();
        }
    }

    async fn handle_queue(&mut self) {
        let input_queue = Arc::clone(&self.input_queue);

        tokio::spawn(async move {
            match input_queue.lock().await.pop_front() {
                Some(packet) if !packet.is_empty() => {
                    println!("PACKET: {:?}", packet);
                }
                _ => {}
            }
        })
        .await
        .unwrap()
    }

    async fn handle_write(&mut self) {
        let writer = self.writer.clone();
        let session = Arc::clone(&self.session);
        let output_queue = Arc::clone(&self.output_queue);

        tokio::spawn(async move {
            println!("I see output here");
            match writer.lock().await.as_mut() {
                Some(writer) => {
                    println!("Stream exists -> do not output");
                    let packet = match output_queue.lock().await.pop_front() {
                        Some(packet) => match session.lock().await.header_crypt.as_mut() {
                            Some(header_crypt) => header_crypt.encrypt(&packet),
                            _ => packet,
                        },
                        _ => vec![],
                    };

                    if !packet.is_empty() {
                        println!("My packet do not displayed: {:?}", &packet);
                        let _ = writer.write(&packet).await.unwrap();
                        writer.flush().await.unwrap();
                    }
                }
                _ => {
                    println!("No stream -> do not output");
                }
            };
        })
        .await
        .unwrap()
    }

    async fn handle_read(&mut self) {
        let queue = Arc::clone(&self.input_queue);
        let reader = Arc::clone(&self.reader);
        let session = Arc::clone(&self.session);

        tokio::spawn(async move {
            if let Some(reader) = reader.lock().await.as_mut() {
                println!("Stream exists -> and I see output !");
                let mut buffer = [0u8; 4096];
                if let Ok(bytes_count) = reader.read(&mut buffer).await {
                    println!("{}", &bytes_count);
                    let raw_data = match session.lock().await.header_crypt.as_mut() {
                        Some(header_crypt) => header_crypt.decrypt(&buffer[..bytes_count]),
                        _ => buffer[..bytes_count].to_vec(),
                    };
                    queue.lock().await.push_back(raw_data);
                }
            }
        })
        .await
        .unwrap()
    }
}

#[tokio::main]
async fn main() {
    let mut client = Client::new();
    client.connect("127.0.0.1", 3724).await;
    client.handle_connection().await;
}

This as_mut() makes it a temporary loan that is bound to just this scope, and forbids the result from being used anywhere else. split() inherits this limitation, so the split parts are also forbidden from being used outside of the scope of as_mut() call. Arc can't undo this temporary nature of references. Arc holding a reference is just as restricted as the reference itself.

You can't use as_mut() here if you want the result of it escape that function. You'll probably need to remove self.stream and split it earlier, or wrap the stream in Option and use self.stream.take().unwrap() to take ownership of the stream when splitting it.

1 Like

when run it returns "thread 'main' panicked at 'called Result::unwrap() on an Err value: Elapsed(())', src\main.rs:69:18", it's on .unwrap() inside handle_connection:

timeout(Duration::from_millis(TIMEOUT), self.handle_read())
                .await
                .unwrap();

I tried to do almost what @adrien-zinger suggested (it's related to your "split earlier"), but got another errors:

error[E0507]: cannot move out of dereference of `tokio::sync::MutexGuard<'_, Option<tokio::net::tcp::OwnedReadHalf>>`
   --> src/main.rs:125:24
    |
125 |             let lock = receiver.lock().await.unwrap().borrow_mut();
    |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ move occurs because value has type `Option<tokio::net::tcp::OwnedReadHalf>`, which does not implement the `Copy` trait
    |
help: consider borrowing the `Option`'s content
    |
125 |             let lock = receiver.lock().await.unwrap().as_ref().borrow_mut();
    |                                                      +++++++++

and

error[E0716]: temporary value dropped while borrowed
   --> src/main.rs:125:24
    |
125 |             let lock = receiver.lock().await.unwrap().borrow_mut();
    |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^             - temporary value is freed at the end of this statement
    |                        |
    |                        creates a temporary which is freed while still in use
126 | 
127 |             match lock.read(&mut buffer).await {
    |                   ---------------------- borrow later used here
    |
    = note: consider using a `let` binding to create a longer lived value

and

error[E0507]: cannot move out of dereference of `tokio::sync::MutexGuard<'_, Option<tokio::net::tcp::OwnedWriteHalf>>`
   --> src/main.rs:110:39
    |
110 |                 let mut transmitter = transmitter.lock().await.unwrap();
    |                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ move occurs because value has type `Option<tokio::net::tcp::OwnedWriteHalf>`, which does not implement the `Copy` trait
    |
help: consider borrowing the `Option`'s content
    |
110 |                 let mut transmitter = transmitter.lock().await.unwrap().as_ref();
    |                                                                        +++++++++

Playground is here.

Variables in Rust are semantically meaningful.

x.lock().as_ref();

is different from:

let tmp = x.lock();
tmp.as_ref();

because the variable keeps the lock (lock guard) alive for as long as the variable exists. If you write it as 1-liner, the expression is temporary, and the lock guard is immediately forgotten about. The compiler complains you're trying to refer to the unlocked object which stopped existing on that line.

2 Likes

When I try to refactor into multiple variables, I got another error.
My code:

async fn handle_read(&mut self) {
        let queue = Arc::clone(&self.input_queue);
        let session = Arc::clone(&self.session);
        let receiver = Arc::clone(&self.receiver);

        tokio::spawn(async move {
            let mut buffer = [0u8; 4096];

            let lock = receiver.lock();
            let awaitable = lock.await;

            match awaitable {
                Some(reader) => {
                    match reader.read(&mut buffer).await {
                        Ok(bytes_count) => {
                            println!("{}", &bytes_count);
                            let raw_data = match session.lock().await.header_crypt.as_mut() {
                                Some(header_crypt) => header_crypt.decrypt(&buffer[..bytes_count]),
                                _ => buffer[..bytes_count].to_vec(),
                            };
        
                            queue.lock().await.push_back(raw_data);
                        },
                        _ => {},
                    };
                },
                _ => {},
            }
        }).await.unwrap()
    }

error:

error[E0308]: mismatched types
   --> src/main.rs:128:17
    |
127 |             match awaitable {
    |                   --------- this expression has type `tokio::sync::MutexGuard<'_, Option<tokio::net::tcp::OwnedReadHalf>>`
128 |                 Some(reader) => {
    |                 ^^^^^^^^^^^^ expected struct `tokio::sync::MutexGuard`, found enum `Option`
    |
    = note: expected struct `tokio::sync::MutexGuard<'_, Option<tokio::net::tcp::OwnedReadHalf>, >`
                 found enum `Option<_>`

it seems like await cannot be used inside match, but I cannot understand, what is wrong.

await in general can be used in match.

Result you get from lock() call is not the object held by the mutex. It's a wrapper type that keeps the lock locked and prevents you from taking the object and using it without the mutex (that would be unsafe).

If you have Mutex<Option<Stream>>, then lock() gives you MutexGuard<Option<Stream>>, not the Option<Stream>.

However, MutexGuard implements a magical trait Deref/DerefMut that allows you to use the type it wraps, but only via a temporary reference.

Try match &mut *awaitable. This will access the content of the mutex guard by reference instead of matching guard's type directly.

2 Likes

thank you very much for detailed explanation ! So, seems like code compiled now, I will try to check if it works as expected:

async fn handle_read(&mut self) {
        let queue = Arc::clone(&self.input_queue);
        let session = Arc::clone(&self.session);
        let receiver = Arc::clone(&self.receiver);

        tokio::spawn(async move {
            let mut buffer = [0u8; 4096];

            let lock = receiver.lock();

            match &mut *lock.await {
                Some(reader) => {
                    match reader.read(&mut buffer).await {
                        Ok(bytes_count) => {
                            println!("{}", &bytes_count);
                            let raw_data = match session.lock().await.header_crypt.as_mut() {
                                Some(header_crypt) => header_crypt.decrypt(&buffer[..bytes_count]),
                                _ => buffer[..bytes_count].to_vec(),
                            };
        
                            queue.lock().await.push_back(raw_data);
                        },
                        _ => {},
                    };
                },
                _ => {},
            }
        }).await.unwrap()
    }

P.S. seems like awaitable variable is redundant here, so I just use match &mut *lock.await

P.P.S. works as expected !!!

1 Like