Send messages from async to sync

In the doc tokio::sync::mpsc - Rust, it says: "So for sending a message from async to sync, you should use the standard library unbounded channel or crossbeam."

So I wrote the following code:

async fn subscribe_websocket_dummy(tx: std::sync::mpsc::Sender<String>) {
    tx.send("hello".to_string());
    tx.send("world".to_string());
}

#[tokio::main]
async fn main() {
    let (tx, rx) = std::sync::mpsc::channel();
    tokio::task::spawn(async move {
        _ = tokio::time::timeout(
            std::time::Duration::from_secs(60),
            subscribe_websocket_dummy(tx.clone()), // tx succeeds, tx.clone() fails
        )
        .await;
    });
    for msg in rx {
        println!("{}", msg);
    }
}

The code above can NOT compile, the compiler throws the following error:

error: future cannot be sent between threads safely
   --> src/main.rs:9:5
    |
9   |     tokio::task::spawn(async move {
    |     ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `Sync` is not implemented for `std::sync::mpsc::Sender<String>`
note: future is not `Send` as this value is used across an await

tx succeeds but tx.clone() fails, why? I really need to use this tx in multiple async functions.

Runnable code is here Rust Playground

Hm, the above code does compile for me. Could you show the version with subscribe_websocket which dosen't compile?

EDIT: you might find rust playground more conveninent than replit: Rust Playground

The real function subscribe_websocket is inside a big code base, so I'm not able to paste code in this post

Could you paste it's signature? If subscribe_websocket_dummy, but subscribe_websocket doesn't, then the answer should be in the difference between the two.

I guess that the signature wouldn't help, if the difference is that subscribe_websocket holds something non-Sync across an await.

I paste the full code in the post now.

Any suggestions to use a std::sync::Sender in multiple async functions? Thanks

Hi Aleksey, I pasted the full code above, which fails to compile now

You can clone tx before constructing the future (the async part) so that the cloned value gets moved into it.

async fn subscribe_websocket_dummy(tx: std::sync::mpsc::Sender<String>) {
    tx.send("hello".to_string());
    tx.send("world".to_string());
}

#[tokio::main]
async fn main() {
    let (tx, rx) = std::sync::mpsc::channel();
    // call clone **before** async move
    let tx_clone = tx.clone();
    tokio::task::spawn(async move {
        _ = tokio::time::timeout(
            std::time::Duration::from_secs(60),
            subscribe_websocket_dummy(tx_clone), // this works now
        )
        .await;
    });
    for msg in rx {
        println!("{}", msg);
    }
}

Playground

I'm guessing this is because you have an immutable reference to the sender somewhere. You should clone it instead.

Alternatively, you can try using the Tokio mpsc channel. It has a blocking_recv method you can use in sync code. (That method was added after I wrote the sentence about blocking channels, which you quoted from the Tokio documentation.)

@RobinH Your trick works, you saved my day, thanks!

@alice I used tx.clone() everywhere, no references, From @RobinH 's answer, the location of tx.clone() matters.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.