How to impl async client auto reconnect to server when disconnect

Sorry for my pool English first!
I have a scene :I need to use tokio tcpstream keepalive conncet to a server .when disconncet or err,I need to reconnect to the server again after every 30 secs. other I rcv msg from Upper using mpsc channel ,then do some opr ,and then send the result to server.At the same time I rcv msg from server then do some opr ,then send to upper . I have write code using tokio loop select!but I can not solve the reconnect promblem. Awesome rust boss help me!


impl ComOpr_232 {
  
    pub async fn run(&mut self, shutdown_complete_tx_comx: mpsc::Sender<()>) {
        let mut interval = time::interval(Duration::from_millis(160));
        let mut interval_3s = time::interval(Duration::from_secs(3));
        let mut interval_60s = time::interval(Duration::from_secs(60));

        //12 sec se
        let mut com_stat_send_index: usize = 0;

        let mut buf: [u8; SERIBUFSIZE_NORMAL] = [0; SERIBUFSIZE_NORMAL];
         

        
        loop {
            tokio::select! {
                // If you run this example without `biased;`, the polling order is
                // psuedo-random, and the assertions on the value of count will
                // (probably) fail.
                biased;
                _ = self.notify_shutdown_com_x.recv() => {

                    return  ;
                }

               
                  // some regular task
                  _= interval_3s.tick()=>{
                    
                    if com_stat_send_index%4==0{
                        let mut bx_com_info=Box::new(comdata::ComStat::default());
                        *bx_com_info=self.run_data.comstat.clone();
                        self.to_main_x_tx.try_send( glb::ExData::ComSelfInfo(self.com_index,bx_com_info)) ;

                    }

             
               
                    
                }
              

                // recv  upper msg then send to server 
                Some(out_info)=self.to_com_5_rx.recv() =>{
                     
                }
                // regular send read cmd to server 
                 _= interval.tick() =>{
                

                }
                //wait for server send back the send back to upper
                res = self.port.next() =>{
                    if res.is_some() {
                         

                        //println!("get a frame back");
                       
                        if let Some(Ok(recv_bytes))=res{
                                

                         }
                         



                    }else{
                        println!("rs232 closed  should not happen");
                    }
                }
            }
        }
    }
}

for add the reconnect func, someone other told me use like this

#[pin_project]
struct MaybeTcpStream(#[pin] Option<TcpStream>);

impl AsyncRead for MaybeTcpStream {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context,
        buf: &mut ReadBuf,
    ) -> Poll<std::io::Result<()>> {
        let stream = self.project().0.as_pin_mut();
        match stream {
            Some(stream) => stream.poll_read(cx, buf),
            None => Poll::Pending,
        }
    }
}

but use the wrapper ,when reconnect ,
**sorry for my pool English **
Thank you for reading .Wish you all the best .

Let me clarify whether I understood you correctly or not:

  1. You have a TCP connection to a server that you need to reconnect to on demand.
  2. A separate thread performs some work separately, and if it sends you any data, this data has to be forwarded to the server for processing.
  3. The server does some work as well, and if any data arrives from it, you want to forward it to another thread once again to process it separately.

If that's the case, you need several moving pieces:

  1. a TCP stream
  2. a Receiver to process data from the server
  3. a Receiver to process and forward data to the server

Two mpsc channels will work just fine here - one will take care of the "from" part and the other one - of the "to". If you want only one thread to take care of both receiving the data "from" the server and the work that needs to forward something "to" the server afterwards, things might look something like this:

Example
use tokio::sync::mpsc::{ Sender, Receiver, channel };
use tokio::net::TcpStream;
use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() {

    // sender / receiver - for data received from the server
    let (from_tcp_sr, from_tcp_rr) = channel(100);
    
    // sender / receiver - for data to send to the server
    let (to_tcp_sr, to_tcp_rr) = channel(100);
    
    // spawn a worker thread
    let work = tokio::spawn(process(to_tcp_sr, from_tcp_rr));
    
    // spawn a connector
    let tcp = tokio::spawn(connect(from_tcp_sr, to_tcp_rr));
    
    // listen for an interruption
    tokio::signal::ctrl_c().await;
    // before exiting
    work.abort();
    tcp.abort();
}

struct Data;

async fn process(sender_to_tcp: Sender<Data>, 
    receiver_from_tcp: Receiver<Data>) {
        
    // use tokio::select!
    
    // and either forward to TCP the results of your async work
    // or get data from TCP and do something with it 
    
    // in a loop
}

async fn connect(sender_to_worker: Sender<Data>, 
    receiver_from_worker: Receiver<Data>) {
    
    loop {
        match TcpStream::connect("127.0.0.1:8080").await { 
            Ok(stream) => {
            
                // use tokio::select!
                
                // and either get data from the stream
                // forwarding it to worker, as needed
                
                // or get data from the worker
                // and forward it to the server
                
                // on TCP connection issues, fall through
                // to the timer for a reconnection attempt in 30 secs
            }
            Err(_) => ()
        }
        let secs_30 = core::time::Duration::from_secs(30);
        tokio::time::sleep(secs_30).await;
    }
}

Not sure what you were planning to achieve with 3 different intervals, each set to different time, to be honest. Just use plain loops, don't make your own life harder. Keep it simple.

1 Like

I was flattered to be answered so seriously by someone I didn't know each other. As you said Keep it simple, what I was struggling with was trying to solve the problem in a separate task. It seems unnecessary now. Thank you again, my English is very poor. I am from China and I am a novice to rust. Welcome to travel to China and experience different cultures. I wish you and your family all the best.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.