How to wrap blocking code that returns multiple values in Tokio?

I am trying to wrap a synchronous MQTT client library using Tokio. The code needs to continuously receive messages via std::sync::mpsc channel and send them into the async code. I understand how to use spawn_blocking for wrapping a code that returns a single value. But how this can be applied to wrap a loop that is continuously receiving messages from std::sync::mpsc channel?

Here is the code that I use to send messages into the channel.

let (mut tx, mut rx) = std::sync::mpsc::channel();

tokio::spawn(async move {
            let mut mqtt_options = MqttOptions::new("bot", settings.mqtt.host, settings.mqtt.port);
            let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap();

            mqtt_client.subscribe(settings.mqtt.topic_name, QoS::AtLeastOnce).unwrap();

            tokio::task::spawn_blocking(move || {
                println!("Waiting for notifications");
                for notification in notifications {
                    match notification {
                        rumqtt::Notification::Publish(publish) => {
                            let payload = Arc::try_unwrap(publish.payload).unwrap();
                            let text: String = String::from_utf8(payload).expect("Can't decode payload for notification");
                            println!("Recieved message: {}", text);
                            let msg: Message = serde_json::from_str(&text).expect("Error while deserializing message");
                            println!("Deserialized message: {:?}", msg);
                            println!("{}", msg);
                            tx.send(msg);
                        }
                        _ => println!("{:?}", notification)
                    }
                }
            });
    });

But I am unsure how should I use tokio API to receive these messages inside another async closure.

tokio::task::spawn(async move || {
    // How to revieve messages via `rx` here? I can't use tokio::sync::mpsc channels 
    // since the code that sends messages is blocking.
});

In this case I recommend you spawn a thread that forwards any messages to a Tokio channel.

Thanks, I have considered this option too, but you can't use tokio::sync::mpsc::channel inside a spawn_blocking closure since Sender must be called inside an async context.

let (mut tok_tx, tok_rx) = tokio::sync::mpsc::channel(10);

    tokio::spawn(async move {
            println!("Conntcting to MQTT server at {}:{}/{}", settings.mqtt.host, settings.mqtt.port, settings.mqtt.topic_name);
            let mut mqtt_options = MqttOptions::new("weather_station_bot", settings.mqtt.host, settings.mqtt.port);
            // .set_security_opts(SecurityOptions::UsernamePassword(String::from("dubovikov.kirill@gmail.com"), String::from("12345")));
            let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap();

            mqtt_client.subscribe(settings.mqtt.topic_name, QoS::AtLeastOnce).unwrap();

            tokio::task::spawn_blocking(move || {
                println!("Waiting for notifications");
                for notification in notifications {
                    match notification {
                        rumqtt::Notification::Publish(publish) => {
                            let payload = Arc::try_unwrap(publish.payload).unwrap();
                            let text: String = String::from_utf8(payload).expect("Can't decode payload for notification");
                            println!("Recieved message: {}", text);
                            let msg: Message = serde_json::from_str(&text).expect("Error while deserializing message");
                            println!("Deserialized message: {:?}", msg);
                            println!("{}", msg);
                            tok_tx.send(msg).await;
                        }
                        _ => println!("{:?}", notification)
                    }
                }
            });
    });

The compiler gives an error in this case:

    |
94  |             tokio::task::spawn_blocking(move || {
    |                                         ------- this is not `async`
...
106 |                             tok_tx.send(msg).await;
    |                             ^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks

In this case I'd probably just spawn an ordinary thread with std::thread::spawn. The spawn_blocking feature is intended for blocking operations that eventually finish.

As for the channel, you can either use the unbounded channel, which has a non-async send method, or you can use futures::executor::block_on on the send call. Using futures' block_on like that is usually discouraged, but on certain elements such as channels and Tokio's JoinHandle it is ok.

1 Like

Thanks a lot! I did now knew that unbounded_channel could be used in non async settings. Do you know why unbounded_channel has non-async send method, but regular channel does not?

Yes. It's because bounded channels might have to wait for a receiver to receive an item, whereas this is never necessary for unbounded channels.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.