Tokio: From async to sync and back to async (block_on vs spawn_blocking)

I want a background task (or thread, or something like that) to repeatingly read from this blocking function:

soapysdr::RxStream::read

And then feed the result into a radiorust::flow::Sender.

Currently, I do it as follows:

let join_handle = spawn_blocking(move || {
    let rt = runtime::Handle::current();
    let mut buf_pool = ChunkBufPool::<Complex<f32>>::new();
    let mut result = Ok(());
    while !abort_recv.has_changed().unwrap_or(true) {
        let mut buffer = buf_pool.get();
        buffer.resize_with(mtu, Default::default);
        let count = match rx_stream.read(&[&mut buffer], 1000000) {
            Ok(x) => x,
            Err(err) => {
                result = Err(err);
                break;
            }
        };
        buffer.truncate(count);
        if let Err(_) = rt.block_on(sender.send(Samples {
            sample_rate,
            chunk: buffer.finalize(),
        })) {
            break;
        }
    }
    if let Err(err) = rx_stream.deactivate(None) {
        if result.is_ok() {
            result = Err(err);
        }
    }
    SoapySdrRxRetval { rx_stream, result }
});

(Source on GitHub)

Notice that I'm in an

So I go from async to sync, and then back to async. This feels a bit awkward. Is it the right way to go?

I wondered if maybe it's better to do something like this:

-let join_handle = spawn_blocking(move || {
-    let rt = runtime::Handle::current();
+let join_handle = spawn(async move {
     let mut buf_pool = ChunkBufPool::<Complex<f32>>::new();
     let mut result = Ok(());
     while !abort_recv.has_changed().unwrap_or(true) {
         let mut buffer = buf_pool.get(); 
         buffer.resize_with(mtu, Default::default);
-        let count = match rx_stream.read(&[&mut buffer], 1000000) {
+        let (read, tmp) = spawn_blocking(move || {
+            let read = rx_stream.read(&[&mut buffer], 1000000);
+            (read, (rx_stream, buffer))
+        }).await.unwrap();
+        rx_stream = tmp.0;
+        buffer = tmp.1;
+        let count = match read {
             Ok(x) => x,
             Err(err) => {
                 result = Err(err);
                 break;
             }
         };
         buffer.truncate(count);
-        if let Err(_) = rt.block_on(sender.send(Samples {
+        if let Err(_) = sender.send(Samples {
             sample_rate,
             chunk: buffer.finalize(),
-        })) {
+        }).await {
             break;
         }

This way, I would only go from async to sync, but not back. However, I wonder if this comes with more overhead because spawn_blocking is called inside the while loop now. How efficient is tokio on spawning threads for blocking operations?

What's the best way to go?

1 Like

From the other thread on CPU-heavy computation:

In your post, you write:

If a blocking operation keeps running forever, you should run it on a dedicated thread. […] Running such a task on either of the two other thread pools is a problem, because it essentially takes away a thread from the pool permanently.

I would thus conclude that my current approach isn't correct. But I'm not sure about whether it's better to use std::thread::spawn (which would be what your blog post suggests), or whether I can have very small short-lived spawn_blocking invocations (as in my diff above).

Update: For now, I decided to use std::thread::spawn with a long-lived thread. (GitHub commit)

In your post, you give the following example:

For example consider a thread that manages a database connection using a channel to receive database operations to perform.

I would assume that receiving from that channel in your example would require using Handle::block_on.

I would generally consider both approaches acceptable. You are correct that you need to use std::thread::spawn in the first version.

Tokio channels have blocking versions of the methods, so this isn't necessary for this particular case, but there isn't anything wrong with using block_on from blocking threads.

1 Like

One problem I face with using a std::thread is that Rust simply terminates all threads when the main thread exits:

fn main() {
    std::thread::spawn(move || {
       println!("Thread started.");
       std::thread::sleep(std::time::Duration::from_millis(200));
       println!("Thread about to finish.");
    });
    std::thread::sleep(std::time::Duration::from_millis(100));
    println!("End of main.");
}

(Playground)

Output:

Thread started.
End of main.

This also means that drop handlers are not executed:

struct DropGuard;

impl Drop for DropGuard {
    fn drop(&mut self) {
        println!("Cleanup work done.")
    }
}

fn main() {
    std::thread::spawn(move || {
       println!("Thread started.");
       let _guard = DropGuard;
       // Uncomment the following line, and cleanup work will be skipped!!
       //std::thread::sleep(std::time::Duration::from_millis(200));
       println!("Thread about to finish.");
    });
    std::thread::sleep(std::time::Duration::from_millis(100));
    println!("End of main.");
}

(Playground)

Uncommenting the sleep line above will prohibit execution of the drop handler.

In my application scenario, this leads to my hardware SDR to keep sending an RF carrier permanently and refusing reinitialization until I unplug it from the USB port (not to blame Rust for this, this is poorly written drivers, but still means I need to run the drop handlers properly.)

I thus wonder if it's better to not use std::thread::spawn but instead use the alternative approach of tokio::task::spawn in combination with tokio::task::spawn_blocking around the blocking read/write operation. Apparently, ending a #[tokio::main] async main function doesn't terminate the program immediately:

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
    tx.send(1).unwrap();
    tx.send(2).unwrap();
    tokio::task::spawn(async move {
       println!("Task started.");
       // We can do a lot:
       tokio::task::spawn_blocking(move || {
           std::thread::sleep(std::time::Duration::from_millis(200));
       });
       tokio::task::yield_now().await;
       println!("Received: {}", rx.recv().await.unwrap());
       tokio::time::sleep(std::time::Duration::from_millis(0)).await;
       tokio::task::spawn_blocking(move || {
           std::thread::sleep(std::time::Duration::from_millis(200));
       });
       tokio::task::yield_now().await;
       println!("Received: {}", rx.recv().await.unwrap());
       // But if we do this, we get canceled:
       //tokio::time::sleep(std::time::Duration::from_millis(200)).await;
       println!("Task about to finish cleanly!");
    });
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    println!("End of main.");
}

(Playground)

Output:

Task started.
Received: 1
Received: 2
Task about to finish cleanly!
End of main.

Yet sometimes tasks do get aborted:

        // But if we do this, we get canceled:
-       //tokio::time::sleep(std::time::Duration::from_millis(200)).await;
+       tokio::time::sleep(std::time::Duration::from_millis(200)).await;

(Playground)

Output:

Task started.
Received: 1
Received: 2
End of main.

Why does the task get aborted on tokio::time::sleep but not on other await points?

The Tokio documentation says:

Shutdown

Shutting down the runtime is done by dropping the value. The current thread will block until the shut down operation has completed.

  • Drain any scheduled work queues.
  • Drop any futures that have not yet completed.
  • Drop the reactor.

Once the reactor has dropped, any outstanding I/O resources bound to that reactor will no longer function. Calling any method on them will result in an error.

I don't really understand what are "work queues" in this context, and which futures get dropped, and what a reactor is. How does this relate to the example above?

Tasks spawned with spawn_blocking cannot be aborted, so they will keep running until they return on their own.

(Unless they are aborted immediately after spawning, in which case Tokio might not start it in the first place.)

1 Like

And how about tasks spawned with spawn? I don't understand yet under which circumstances they get aborted.

In particular:

Does this have to do with sleep being some sort of "I/O operation" while the others are not?

Tasks spawned with spawn are aborted at the next .await that yields, or immediately if not currently running.

The task will get aborted at a yield_now call if the abort signal is sent before the call to yield_now. As for recv, the task might not yield at the recv if a message is immediately available, and it will only get cancelled at the recv if it yields.

1 Like

Ah, I just noticed I got confused because I forgot an .await in one of my examples above:

         tokio::task::spawn_blocking(move || {
             std::thread::sleep(std::time::Duration::from_millis(200));
-        });
+        }).await.unwrap();
-        tokio::task::yield_now().await;
+        tokio::task::yield_now().await; // gets aborted here
         println!("Received: {}", rx.recv().await.unwrap());
         tokio::time::sleep(std::time::Duration::from_millis(0)).await;
         tokio::task::spawn_blocking(move || {
             std::thread::sleep(std::time::Duration::from_millis(200));
-        });
+        }).await.unwrap();

(Playground)

So my confusion was simply caused by a missing await.

After testing some more, I created another example to demonstrate the problem:

use std::time::Duration;

struct Resource(&'static str);

impl Resource {
    fn new(name: &'static str) -> Self {
        println!("Resource {name} aquired.");
        Self(name)
    }
}

impl Drop for Resource {
    fn drop(&mut self) {
        let name = self.0;
        println!("### Resource {name} released.");
    }
}

#[tokio::main]
async fn main() {
    std::thread::spawn(move || {
        let _resource = Resource::new("A");
        // this will sleep until the process gets terminated
        // and the drop handler is never executed:
        std::thread::sleep(Duration::from_millis(300));
        println!("End of task.");
    });
    tokio::task::spawn(async move {
        let _resource = Resource::new("B");
        tokio::time::sleep(Duration::from_millis(200)).await;
        // execution doesn't reach this point:
        println!("End of task B.");
        // but the drop handler is executed anyway
    });
    tokio::task::spawn(async move {
        let _resource = Resource::new("C");
        tokio::task::spawn_blocking(|| {
            std::thread::sleep(Duration::from_millis(200));
        }).await.unwrap();
        // execution doesn't reach this point:
        println!("End of task C.");
        // but the drop handler is executed anyway
    });
    tokio::time::sleep(Duration::from_millis(100)).await;
    println!("End of main.");
}

(Playground)

Output:

Resource C aquired.
Resource B aquired.
Resource A aquired.
End of main.
### Resource B released.
### Resource C released.

The drop handler for Resource A never gets executed.

I feel like the short-lived spawn_blocking invocations are the better way to go, unless there's some other way to make sure drop handlers always execute.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.