Use of moved value: `tcp1` ERROR even after passing it as mutable reference

Hi Team,

Need a help in below code:
we are trying to connect and shutdown the connection in separate thread but while passing tcp streams as mutable reference and address, and creating the connection, we are getting below error in tokio:spawn async line:

use of moved value: tcp1

value moved here, in previous iteration of looprustc(E0382)

cli.rs(12, 9): move occurs because tcp1 has type std::result::Result<std::net::TcpStream, std::io::Error>, which does not implement the Copy trait

cli.rs(47, 21): this reinitialization might get skipped

cli.rs(71, 37): value moved here, in previous iteration of loop

cli.rs(72, 62): use occurs due to use in generator

use std::net::{TcpStream};
use std::io::{self, Read, Write, Result};
use std::net::Shutdown;
use std::time::SystemTime;
use tokio::net::{TcpListener};
use tokio_stream::StreamExt;
use tokio_util::codec::LengthDelimitedCodec;
use std::{thread, time};
// use std::io;
​​
fn main(){
    let mut addr = String::from("127.0.0.1:8000");
    let mut tcp1 = TcpStream::connect(&addr);
    let mut tcp2 = TcpStream::connect(&addr);
​
    let mut tcp_timer_x = SystemTime::now();
    let mut tcp_flag = 1;
​
    loop{
        let tcp_timer_y :u64= tcp_timer_x.elapsed().unwrap().as_secs().try_into().unwrap();
      
        if tcp_timer_y > 5{
            println!("Switching now -------- $$$$$$$$$$$$$");
            if tcp_flag == 1 {
                if tcp2.is_ok(){
                    
                        tcp_flag = 2;
                        println!("TCP2");
                        // t.shutdown(Shutdown::Both).expect("shutdown call failed");
                        // println!("TCP1 shutdown");
                        // t.write("hey this is tcp2".as_bytes()).unwrap();
                    }
                   else {
                        println!("Error in Tcp2");
                    }
​              
                }
​
                if tcp_flag==2{
                    
                    tcp1 = TcpStream::connect(&addr);
                    if tcp1.is_ok(){
                    
                        tcp_flag = 2;
                        println!("TCP2");
                     
                    }
                   else {
                        println!("Error in Tcp2");
                    }
​            
            }
​
            tokio::spawn(async move {
                if let Err(e) = shutandreconn(tcp_flag, &mut tcp1.unwrap(), &mut tcp2.unwrap(), &mut addr ).await {
                    println!("failed to process connection; error = {}", e);
                }
            });
          ​
             }
            tcp_timer_x = SystemTime::now();
        }
​
        let secs = time::Duration::from_secs(1);
        thread::sleep(secs);    
        println!("Using tcp {:?}",tcp_flag);
​
    }
    
async fn shutandreconn(mut tcpflag: i32,  t1: &mut TcpStream ,  t2: &mut TcpStream, addrs: &mut std::string::String ) -> io::Result<()>
{
    if(tcpflag==1){
       *t2= TcpStream::connect(addrs.to_string()).unwrap();
        t1.shutdown(Shutdown::Both).expect("shutdown call failed");
    }
    else{
        *t1= TcpStream::connect(addrs.to_string()).unwrap();
        t2.shutdown(Shutdown::Both).expect("shutdown call failed");
    }
    Ok(())
}

Please suggest, how to solve this error.

Minimized example:

use std::net::TcpStream;

fn main() {
    let tcp1 = TcpStream::connect("127.0.0.1:8000");

    loop {
        tokio::spawn(async move { shutandreconn(&mut tcp1.unwrap()).await });
    }
}

async fn shutandreconn(_t1: &mut TcpStream) {}

Playground


The most obvious problem is that tcp1 is moved into the async move block, and only inside it is borrowed. Of course, this leaves the value inaccessible on the next iteration.
If we remove the move, however, the error remains the same. The reason this time is that unwrap takes ownership of the Result, so it essentially forces the move of tcp1 into the block, no matter how it is marked. Let's try and move the unwrap out of the block...

use std::net::TcpStream;

fn main() {
    let mut tcp1 = TcpStream::connect("127.0.0.1:8000").unwrap();

    loop {
        tokio::spawn(async { shutandreconn(&mut tcp1).await });
    }
}

async fn shutandreconn(_t1: &mut TcpStream) {}

Now there are two errors at once:

error[E0499]: cannot borrow `tcp1` as mutable more than once at a time
 --> src/main.rs:7:28
  |
7 |         tokio::spawn(async { shutandreconn(&mut tcp1).await });
  |                      ------^^^^^^^^^^^^^^^^^^^^^----^^^^^^^^^
  |                      |     |                    |
  |                      |     |                    borrows occur due to use of `tcp1` in generator
  |                      |     `tcp1` was mutably borrowed here in the previous iteration of the loop
  |                      argument requires that `tcp1` is borrowed for `'static`

error[E0373]: async block may outlive the current function, but it borrows `tcp1`, which is owned by the current function
 --> src/main.rs:7:28
  |
7 |         tokio::spawn(async { shutandreconn(&mut tcp1).await });
  |                            ^^^^^^^^^^^^^^^^^^^^^----^^^^^^^^^
  |                            |                    |
  |                            |                    `tcp1` is borrowed here
  |                            may outlive borrowed value `tcp1`
  |
  = note: async blocks are not executed immediately and must either take a reference or ownership of outside variables they use
help: to force the async block to take ownership of `tcp1` (and any other referenced variables), use the `move` keyword
  |
7 |         tokio::spawn(async move { shutandreconn(&mut tcp1).await });
  |                            ++++

Well, at least we see now why did you use move at the first place. Seems that the problem is unsolvable, then - tokio::spawn requires 'static, so the passed future cannot borrow from its environment.

But now - why this requirement is here?
Well, that's because tokio::spawn does exactly what is written - it spawns the task in a "fire-and-forget" fashion, i.e. this task is allowed to run arbitrarily long. It can run until the next iteration of the loop - and now we have a problem, since we're trying to use tcp1, which is exclusively borrowed!

Looking at the original code, however, I assume that you do not want to use tokio::spawn. It seems that you want to block the current thread, until the future returned by shutandreconn resolves.
To do this, we must explicitly create the Tokio runtime and execute our future in it:

use tokio::runtime::Runtime;
use std::net::TcpStream;

fn main() {
    let mut tcp1 = TcpStream::connect("127.0.0.1:8000").unwrap();
    let runtime = Runtime::new().unwrap();

    loop {
        runtime.block_on(async { shutandreconn(&mut tcp1).await });
    }
}

async fn shutandreconn(_t1: &mut TcpStream) {}

This compiles successfully.

3 Likes

Thank you for your elaborated response. It was really helpful.

If I understood you correctly -
Here if we block the current thread and it will waste current thread's time and our program will take more time to complete the write(of suppose one chunk -- which is equals to loosing few secs or msec) RIght?
on your below point -
But now - why this requirement is here?
Well, that's because tokio::spawn does exactly what is written - it spawns the task in a "fire-and-forget" fashion, i.e. this task is allowed to run arbitrarily long. It can run until the next iteration of the loop - and now we have a problem, since we're trying to use tcp1 , which is exclusively borrowed!

In order to avoid this(wastage of main thread's time) only we planned to shift the connect and shutdown part to async.
And Even if it(async thread) can run until the next iteration of the loop, I don't think there will be an issue. because untill then our previous connection will be active and write will keep on happening with that conn, and unless out new conn gets active, the flag won't change.
What do you think/suggest?

Of course. If the connection failed, you can't write to it anyway.

Here we have 2 connections. and we are switching. both the connections are active initially and we shutdown and reconnect alternatively based on flag change. so one connection will be always active because the flag will only change after checking on the another connection(if its active or not)

Hi,
Any update/suggestions on this?

How can I implement this logic some other way where I can have both the threads running concurrently. one thread for writing and one thread for reconnect and shutdown?

In this case, you don't need async. Just launch the reconnection thread, let it capture crossbeam::channel::Receiver, and send the commands for reconnection through this channel. And, of course, don't forget that this approach will require sharing the TCP channels with Arc<Mutex<_>>, since otherwise Rust won't be able to check that there is no data race.

1 Like