Crossing sync/async boundaries with channels

As a long-running hobby project, I'm building a music server to stream my personal music collection.

One of the next features I'm looking to implement is automatically adding and organizing music dropped into a specific folders. This includes watching a specific folder for filesystem notifications (using notify-rs) and moving each new file into a subfolder structure according to artist/album/etc.

Since notify-rs and file system operations all live inside the realm of non-async, blocking operations, I'm currently running this background task using tokio::task::spawn_blocking. However, other subsystems running inside the tokio runtime (sending new music data to the full text search server, sending notifications to the web client, etc.) of the music server also need to be notified of these changes. As such, I planned on notifying them using a tokio::sync::broadcast channel.

However, I'm not sure how to use an async channel by sending data from a blocking task and consuming them in non-blocking tasks.

I believe the following example should most likely work, but I'm not sure how safe it is to use futures::executor::block_on?

let (tx, mut rx1) = tokio::sync::broadcast::channel(10);
let mut rx2 = tx.subscribe();

tokio::task::spawn(async move {
    assert_eq!(rx1.recv().await.unwrap(), "Hello!");
    assert_eq!(rx1.recv().await.unwrap(), "World!");

    // Send notifications to web client etc.
});

tokio::task::spawn(async move {
    assert_eq!(rx2.recv().await.unwrap(), "Hello!");
    assert_eq!(rx2.recv().await.unwrap(), "World!");

    // Send data to search etc.
});

tokio::task::spawn_blocking(move {
    // Handle file system notifcations
    // ...
    // Handle file system operations

    futures::executor::block_on(tx.send("Hello!")).unwrap();
    futures::executor::block_on(tx.send("World!")).unwrap();
})

As I understand it, tokio::spawn_blocking should spawn a separate thread where it is safe to call blocking code, and futures::executor::block_on will run a future to completion on the current thread (aka the one tokio spawned to take care of the blocking code), which should by definition then be safe to block on as well?

Is this an acceptable pattern? Is there a better way to do this?

Correct. Calling block_on from a spawned blocking thread is fine. Another thing you can do which might be easier is to wrap the entire thing in an async block, and then block_on that, since writing .await is easier than futures::executor::block_on(. And you're allowed to block inside this async block because it's spawned on a blocking thread.

But I'm not sure why you need to block on it in the first place - broadcast::Sender::send is not an async function.

If the send method had been async, then yes, you can use futures::executor::block_on to call it, and this is indeed ok inside spawn_blocking. However, this is unnecessary with all types of channels available in tokio::sync because either their send method is not async, or they provide a blocking_send method specifically for this use-case.

Of course, seems like I was having one hell of a brainfart here :man_facepalming:.

Either way, thanks for confirming how this would work at least!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.