Tonic gRPC client with infinite synchronous loop

I have a rust application that uses the tonic crate and acts as a gRPC client. The app also sets up a ZMQ subscriber instance, and subscribes to a couple different topics for incoming data. Once the app gets data one those topics, it formats it and sends it to the gRPC server via tonic.

The tonic crate uses the tokio stack, therefore I see most examples using the tokio runtime #[tokio::main]. My question is, since the overall flow of data receiving via ZMQ, formatting, etc should be synchronous, how would I set up my application since the gRPC send needs a tokio runtime?

Right now I have it so in my #[tokio::main] main() function I just loop indefinitely and block on the ZMQ socket.recv(), and then send via gRPC but I know this probably isn't the correct way to do this

#[tokio::main]
async fn main()  -> Result<(), Box<dyn std::error::Error>> {
   // Initialize ZMQ, gRPC client, variables, ect.

    loop {
        let mut topic_message = Message::new();
        subscriber.recv(&mut topic_message, 0)?;

        // Format data, do some calculations on it

       // Send via gRPC, I am not doing anything with the response
       tonic::Request::new(data);
       let _response = client?.send(request).await?;
    }
}

I am still learning tokio so any and all suggestions on how to handle this would be much appreciated!

You should not run synchronous code in Tokio. See these articles for alternatives:

What if I in my app I had a dedicated thread that subscribes to the ZMQ topics and sends to a tokio mpsc channel? And then a tokio::spawn thread can recv that data through the channel and process asynchronously and send via tonic?

use tokio::sync::mpsc::unbounded_channel;
use std::thread;
use zmq::Message;

#[tokio::main]
async fn main()  -> Result<(), Box<dyn std::error::Error>> {
   // Initialize ZMQ, gRPC client, variables, ect.

   // Set up mpsc channel between zmq subscriber handle and tonic handle (unbounded now for testing)
   let (sender, mut receiver) = unbounded_channel::<Vec<u8>>();

   let tonic_handle = tokio::spawn(async move {
        while let Some(data) = receiver.recv().await {
             // Format data, put together protobuf

            // Send via gRPC, I am not doing anything with the response
            tonic::Request::new(data);
            let _response = client?.send(request).await?;
        }
   });


    let zmq_handle = thread::spawn(move || {
       loop {
           // Receive latest message published to topic
           let mut message = Message::new();
           subscriber.recv(&mut message, 0)?;

           // Format data, do some calculations on it
           data = message.to_vec();

          // Send to mpsc channel
          sender.send(data);
       }
    });

    // await the tonic handle task
    tonic_handle.await?;

    // Join the zmq subscriber thread
    zmq_handle.join()?;
}

That would be fine.

1 Like