As a long-running hobby project, I'm building a music server to stream my personal music collection.
One of the next features I'm looking to implement is automatically adding and organizing music dropped into a specific folders. This includes watching a specific folder for filesystem notifications (using notify-rs
) and moving each new file into a subfolder structure according to artist/album/etc.
Since notify-rs
and file system operations all live inside the realm of non-async, blocking operations, I'm currently running this background task using tokio::task::spawn_blocking
. However, other subsystems running inside the tokio runtime (sending new music data to the full text search server, sending notifications to the web client, etc.) of the music server also need to be notified of these changes. As such, I planned on notifying them using a tokio::sync::broadcast
channel.
However, I'm not sure how to use an async channel by sending data from a blocking task and consuming them in non-blocking tasks.
I believe the following example should most likely work, but I'm not sure how safe it is to use futures::executor::block_on
?
let (tx, mut rx1) = tokio::sync::broadcast::channel(10);
let mut rx2 = tx.subscribe();
tokio::task::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), "Hello!");
assert_eq!(rx1.recv().await.unwrap(), "World!");
// Send notifications to web client etc.
});
tokio::task::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), "Hello!");
assert_eq!(rx2.recv().await.unwrap(), "World!");
// Send data to search etc.
});
tokio::task::spawn_blocking(move {
// Handle file system notifcations
// ...
// Handle file system operations
futures::executor::block_on(tx.send("Hello!")).unwrap();
futures::executor::block_on(tx.send("World!")).unwrap();
})
As I understand it, tokio::spawn_blocking
should spawn a separate thread where it is safe to call blocking code, and futures::executor::block_on
will run a future to completion on the current thread (aka the one tokio spawned to take care of the blocking code), which should by definition then be safe to block on as well?
Is this an acceptable pattern? Is there a better way to do this?