How to handle reconnect & shutdown correctly in Tokio?

Hi,

I'm working on an async message processing service in Rust and currently trying to square together reconnect & shutdown handling.

The previous version did handled shutdown signals nicely, but didn't handle reconnect in case of an error. Specifically, the main loop was implemented like so:

pub async fn run(
        self,
        signal: impl Future<Output = ()> + Send + 'static,
    ) -> Result<(), MessageProcessingError> {
        // When call .await on a &mut _ reference, pin the future. https://docs.rs/tokio/latest/tokio/macro.pin.html#examples
        let signal_future = signal;
        pin!(signal_future);

        // Somehow we need to keep the consumer alive
        // otherwise the consumer close the connection with an error:
        let consumer = fluvio::consumer(&self.channel_topic, 0).await.
             expect("Failed to create a consumer for data topic");

        // Creates a stream of messages from the topic.
        let mut stream = consum .stream(Offset::end()).await
            .expect("[QDGW/Service:run]: Failed to create a stream");

        loop { 
           // Handle Shutdown signal
            select! {
                    _ = &mut signal_future => {
                         break;
                    }

                    record = stream.next() => {
                        if let Some(res) = record {
                                     match res {
                                         Ok(record) => {
                                             match self.handle_record(&record).await{
                                            Ok(()) => {},
                                            Err(e) => {
                                                return Err(e);
                                            }
                                        }
                                     },
                                        Err(e) =>{
                                             return Err(MessageProcessingError(e.to_string()));
                                         }
                                 }
                             }
                }// end stream.next()
            } // end select
        } // end loop

        Ok(())
    }

Link to full code.

This does work as expected, except it does not handle connection loss. However,
somehow the services loses connection once in a while so a reconnect is needed.

My current attempt to handle reconnection looks like this:

 pub async fn run(
        self,
        signal: impl Future<Output = ()> + Send + 'static,
    ) -> Result<(), MessageProcessingError> {
        //
        // When call .await on a &mut _ reference, pin the future. https://docs.rs/tokio/latest/tokio/macro.pin.html#examples
        let signal_future = signal;
        pin!(signal_future);

        // Handle reconnecting to the cluster if the connection fails.
        loop {
            // Connect to the Fluvio cluster.
            let Ok(client) = fluvio::consumer(&self.channel_topic, 0).await else {
                println!("[QDGW/Service:run]: Could not connect to Fluvio cluster, retrying");
                sleep(RETRY).await;
                continue;
            };

            // Creates a stream of messages from the topic.
            let Ok(mut stream) = client.stream(Offset::end()).await else {
                println!("[QDGW/Service:run]: Failed to create stream, retrying");
                sleep(RETRY).await;
                continue;
            };

            loop {
                select! {
                        record = stream.next() => {
                            if let Some(res) = record {
                                         match res {
                                             Ok(record) => {
                                                 match self.handle_record(&record).await{
                                                Ok(()) => {},
                                                Err(e) => {
                                                    return Err(e);
                                                }
                                            }
                                         },
                                            Err(e) =>{
                                                println!("[QDGW/Service:run]: Reconnecting stream");
                                                sleep(RETRY).await;
                                                // Exit inner loop and try to reconnect stream
                                                break;
                                            }
                                     }
                                 }// end match record
                    }// end stream.next()
                } // end select
            } // End inner loop
        } // end outer loop

        // currently unreachable, because the signal handler is not yet implemented
        Ok(())
    }

Full code on Github.

However, this does not handle shutdown signals anymore because I got too many compile errors so I had to remove the shutdown handle for the time being to make the code run. Also, I am not exactly certain about the reconnect handling either. I believe a backoff strategy would be sensible in case of a full network failure.

However, I just cannot figure out how to handle reconnect & shutdown correctly in Tokio?

Any help on this would be appriciated

Futures in Rust can only complete once, after which you cannot await them again. If you try to do this, e.g. if you poll signal_future in your select! statement after the shutdown signal has been processed the first time, Tokio will panic.

Therefore, you will not only need to reconnect to your Fluvio cluster at the top of your outer loop, you also need to find some way to get a new signal_future for the next shutdown signal.

Yes, this would be sensible, but it will need to be treated as a separate problem from the issue mentioned above.

Thank you,

I learned that the hard way as I got a lot of those tokio runtime panics. It seems though the actual cause of the connection drop seems to be unrelated to the connection handling in Tokio, but somehow the Fluvio cluster just drops the connection randomly. I am still looking into this fairly bizarre issue, but some Fluvio folks could not reproduce the connection drop with the exact same code I've shared so I have reason to believe there might be some other stuff at play.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.