ZMQ: future cannot be sent between threads safely

I need to continuously listen to a ZeroMQ subscription. As soon as I receive a message, I need to trigger an async action. Here's some simplified representation of what I need to achieve.

use std::str;
use std::error::Error;

#[tokio::main]
async fn main() {
    tokio::spawn(zmq_listener()); // spawn so it runs continuously without blocking

    ... // do other stuff
}

async fn zmq_listener() -> Result<(), Box<dyn Error + Send>> {
    let ctx = zmq::Context::new();

    let socket = ctx.socket(zmq::SUB).unwrap();
    socket.connect("tcp://0.0.0.0:3333").unwrap();
    socket.set_subscribe(b"mytopic").unwrap();
    loop {
        match socket.recv_multipart(0) {
            Ok(_) => trigger_action().await,
            Err(_) => {},
        };
    }
}

async fn trigger_action() {
    todo!();
}

I'm running into this compilation error.

error: future cannot be sent between threads safely
   --> src/main.rs:6:18
    |
6   |     tokio::spawn(zmq_listener());
    |                  ^^^^^^^^^^^^^^ future returned by `zmq_listener` is not `Send`
    |
    = help: within `Socket`, the trait `Sync` is not implemented for `*mut c_void`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:18:34
    |
16  |         match socket.recv_multipart(0) {
    |               ------ has type `&Socket` which is not `Send`
17  |             Ok(_) => {
18  |                 trigger_action().await;
    |                                  ^^^^^ await occurs here, with `socket` maybe used later
...
21  |         };
    |          - `socket` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/main.rs:16:15
    |
16  |         match socket.recv_multipart(0) {
    |               ^^^^^^^^^^^^^^^^^^^^^^^^
note: required by a bound in `tokio::spawn`
   --> /Users/bear/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/task/spawn.rs:166:21
    |
164 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
165 |     where
166 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`

Can someone explain why this is happening, and what would be the most elegant solution?

you first spawn is not needed.

#[tokio::main]
async fn main() {
    zmq_listener().await.unwrap();
}

spawn later the trigger_action

Please see:

Your call to recv_multipart is blocking but has no .await. This is a problem. You need to use an async library, or follow the instructions at bridging with sync code to properly isolate the non-async code.

2 Likes

hi @daaitch thanks for the quick answer

I apologize for the lack of context, I'll edit my OP so it is more clear.
Basically, I cannot block on zmq_listener. I need it to be an ongoing thread on my program, and the loop will basically run forever. That was the original motivation behind tokio::spawn.

thanks @alice, tokio::task::spawn_blocking was the answer I was looking for:

use std::error::Error;
use std::sync::{Mutex, Arc};

#[tokio::main]
async fn main() {
    tokio::spawn(zmq_listener());
    loop {}
}

async fn zmq_listener() -> Result<(), Box<dyn Error + Send>> {
    let ctx = zmq::Context::new();

    let socket = ctx.socket(zmq::SUB).unwrap();
    socket.connect("tcp://0.0.0.0:3333").unwrap();
    socket.set_subscribe(b"hashblock").unwrap();

    let socket = Arc::new(Mutex::new(socket));

    loop {
        let socket_clone = Arc::clone(&socket); // Clone Arc
        match tokio::task::spawn_blocking(move || {
            let socket = socket_clone.lock().unwrap(); // Lock the Mutex
            socket.recv_multipart(0)
        }).await {
            Ok(Ok(d)) => trigger_action(d).await,
            _ => {},
        };
    }
}

async fn trigger_action(d: Vec<Vec<u8>>) {
    todo!();
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.