Running tokio runtime

I figured out how to run tokio runtime on the main thread:

use std::time::Duration;
use tokio::{runtime::Builder, sync::oneshot::error::TryRecvError, task::yield_now, time::sleep};

fn main() {
    println!("hello");
    let rt = Builder::new_current_thread().enable_all().build().unwrap();
    let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();

    rt.spawn(async {
        println!("async, before sleep");
        sleep(Duration::from_secs(2)).await;
        println!("async, after sleep");
        tx.send(()).unwrap();
        println!("async, after send");
        sleep(Duration::from_secs(0)).await;
        println!("async, after second sleep, should not print");
    });

    loop {
        match rx.try_recv() {
            Ok(_) => break,
            Err(TryRecvError::Empty) => (), // do nothing
            Err(TryRecvError::Closed) => panic!("channel closed"),
        }

        rt.block_on(async {
            yield_now().await;
        });
    }
	drop(rt);
    println!("after runtime drop");
}

I think your code is correct. (edit: I think it still consumes CPU time unnecessarily, see update below)

However, note that tokio::sync::oneshot::Receiver allows you to asynchronously wait for the value, even though there is no recv method. Instead, oneshot::Receiver is a Future (see implementation) that can be directly awaited.

So your code can be simplified to:

 use std::time::Duration;
-use tokio::{runtime::Builder, sync::oneshot::error::TryRecvError, task::yield_now, time::sleep};
+use tokio::{runtime::Builder, time::sleep};
 
 fn main() {
     println!("hello");
     let rt = Builder::new_current_thread().enable_all().build().unwrap();
-    let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();
+    let (tx, rx) = tokio::sync::oneshot::channel::<()>();
 
     rt.spawn(async {
         println!("async, before sleep");
         sleep(Duration::from_secs(2)).await;
         println!("async, after sleep");
         tx.send(()).unwrap();
         println!("async, after send");
         sleep(Duration::from_secs(0)).await;
         println!("async, after second sleep, should not print");
     });
 
-    loop {
-        match rx.try_recv() {
-            Ok(_) => break,
-            Err(TryRecvError::Empty) => (), // do nothing
-            Err(TryRecvError::Closed) => panic!("channel closed"),
-        }
-
-        rt.block_on(async {
-            yield_now().await;
-        });
-    }
+    rt.block_on(async {
+        match rx.await {
+            Ok(_) => (),
+            Err(_) => panic!("channel closed"),
+        }
+    });
     drop(rt);
     println!("after runtime drop");
 }

(Playground)

This way, you can avoid the loop and the yield_now.


Or yet shorter:

-    rt.block_on(async {
-        match rx.await {
-            Ok(_) => (),
-            Err(_) => panic!("channel closed"),
-        }
-    });
+    rt.block_on(rx).expect("channel closed");
}

(Playground)


Update:

Note that in your code, this loop will never sleep:

    loop {
        match rx.try_recv() {
            /* … */
        }
        rt.block_on(async {
            yield_now().await;
        });
    }

I.e. if all tasks are waiting for I/O (or waiting for a timeout), your CPU will run all the time, going to call try_recv over and over.

Yes, you are right. This example can be rewritten. But the idea behind this code is to run tokio on the main thread in the main cycle of the app, which has a lot of other synchronous code. Actually, I'm adding async support to the existing application, and there will be not too much async code.

    loop {

       /* update everything else */

       // 
        rt.block_on(async {
            yield_now().await;
        });
    }

If the part where it says /* update everything else */

  • neither blocks for a long time
  • nor does nothing for a certain time (e.g. because it waits for something and just returns)

then I guess you could proceed like that. But I feel like using a separate thread might be better (depending on your particular use case).

Note that if you use two threads and one of the threads executes rt.block_on(rx), then the thread which drives the tokio runtime won't consume any CPU when all async tasks are waiting for something. So I don't see why you shouldn't make two threads: One to drive the asynchronous tasks (i.e. rt.block_on()) and the other which does "update everything else".

Example:

use std::time::Duration;
use tokio::{runtime::Builder, time::sleep};
use tokio_util::sync::CancellationToken;

fn main() {
    println!("hello");
    let rt = Builder::new_current_thread().enable_all().build().unwrap();
    let cancel = CancellationToken::new();
    let cancel2 = cancel.clone();
    let cancel3 = cancel.clone();

    rt.spawn(async move {
        println!("async, before sleep");
        sleep(Duration::from_secs(2)).await;
        println!("async, after sleep");
        cancel2.cancel();
        println!("async, after cancel");
        sleep(Duration::from_secs(0)).await;
        println!("async, after second sleep, should not print"); // not sure if this is guaranteed
    });

    let join_handle = std::thread::spawn(move || {
        rt.block_on(cancel3.cancelled()); // this will not consume CPU unnecessarily
        println!("after runtime drop");
    });
    
    while !cancel.is_cancelled() {
        /* do something */
        std::thread::sleep(Duration::from_millis(10));
    }
    
    join_handle.join().unwrap();
}

(Playground)


Update:

In the example above, I forgot that a panic might lead to the cancellation token not being cancelled. So if you are going to use CancellationToken, it's likely better to do something like this:

     let cancel = CancellationToken::new();
-    let cancel2 = cancel.clone();
+    let cancel2_guard = cancel.clone().drop_guard();
     let cancel3 = cancel.clone();
 
     rt.spawn(async move {
         println!("async, before sleep");
         sleep(Duration::from_secs(2)).await;
         println!("async, after sleep");
-        cancel2.cancel();
+        drop(cancel2_guard);
         println!("async, after cancel");

(Playground)

If cancel2_guard is dropped (e.g. due to a panic in the async move block), the two threads will be notified and can terminate gracefully.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.