Running tokio runtime

I currently have some code, that runs tokio runtime on a single separate thread:

        let thread_handle = std::thread::spawn(move || {

            // create tokio runtime and notify main thread
            let rt = Builder::new_current_thread().enable_all().build().unwrap();
            let (lock, cvar) = &*runtime_handle_lock2;
            let mut handle_lock = lock.lock().unwrap();
            handle_lock.replace(rt.handle().clone());
            drop(handle_lock);
            cvar.notify_one();

            // run tokio runtime
            rt.block_on(async {
                while !stop_flag2.load(std::sync::atomic::Ordering::Relaxed) {
                    tokio::task::yield_now().await;
                }
            });
        });

Then I can use tokio Handle to spawn tasks on it from other threads:

    let _enter_guard = rt.enter();
    tokio::spawn(async { /*async code here*/ });

It works, but what bothers me here is that this thread will never sleep even if there are no tasks running on tokio. Is there a better way to keep tokio running on the thread?

Also, I would actually prefer to not have a separate thread, but to run the runtime on the same thread. Something like this:

// in main application loop:
rt.update(); // will run all the async code here

Is it possible?

I don't think it's possible to "wait" on atomics. Instead, you'll need your task to be notified on change. You can use some of the channels in tokio::sync for example.

If you have your main loop running endlessly anyway, it might be possible to poll the future returned by block_on with a dummy Context/Waker (using Wake and Waker::from to avoid unsafe). But I never did anything like that. Maybe there is an easier way to "poll" a future without waker?


There is now_or_never, but I wonder if there's something like now_or_later.

Update: I found futures::task::noop_waker, which is used by now_or_never internally.

Sorry for the question not being clear enough. I just pasted the code without removing unnecessary details.

The relevant code example will be just:

            std::thread::spawn(move || {

            let rt = Builder::new_current_thread().enable_all().build().unwrap();
            rt.block_on(async {
                loop {
                    tokio::task::yield_now().await;
                }
            });
        });

This async task with inifinite loop is used to keep tokio runtime alive so that it is possible to add other tasks to it later. But it looks like this infinite loop will eat processor time and cause battery draining on mobile.
I think I can just add sleep inside the loop and that would be enough.

                loop {
                    tokio::task::yield_now().await;
                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                }

But maybe there is a better solution. For example, it can be possible to have tokio runtime alive and running without a such fake task at all.

Maybe what you're looking for is std::future::pending. Calling std::future::pending::<()>().await will wait indefinitely without consuming CPU time.

I am really new to rust async/Futures. But isn't it that block_on blocks until the future is ready? So after block_on return there is no need to poll this future again.

Also, I need to poll not only the future of this fake task but all the futures added to tokio runtime from different parts of the program.

Yes, block_on takes a future and will try to poll it until it's ready. It's the "main loop" of the async executor (I hope I got terminology right here).

You could do this:

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    // This will block the current thread indefinitely without consuming CPU:
    rt.block_on(async move {
        std::future::pending::<()>().await;
    });
}

(Playground)

But then you can't check for a termination flag. Instead of using pending, you could attempt to read a value from a channel, and return from the async move when the channel got closed.

I'm not too familiar with tokio to tell you how it's done idiomatically. But I think you can use the spawn method on a runtime handle to add tasks.


You should try to avoid this, because it would reduce responsiveness:

It would be a suboptimal workaround.

I do not think that waiting on the thread sync primitive (like a channel or a condition variable) would work.
There is only one thread, that runs the tokio runtime in this example, and it will be blocked, preventing other tasks from running.
pending - will work, unless I need a way to terminate the thread somehow.

You need an async channel of course.

Thanks! As I said I'm new to this and was not aware of such thing as async thread sync primitives.

I think for checking a request for termination, a tokio::sync::watch channel might be suitable. tokio::sync::watch::Receiver::changed, for example, is an async method, which will yield control back to the async runtime instead of blocking the thread when waiting.

1 Like

Here an example of using tokio's mpsc channel to achieve shutting down the tokio runtime based on an external signal:

use tokio::runtime::Builder;
use tokio::sync::mpsc;

use std::thread;
use std::time::Duration;

fn main() {
    let rt = Builder::new_current_thread().enable_all().build().unwrap();
    let handle = rt.handle().clone();
    
    let (tx, mut rx) = mpsc::channel(1);
    
    thread::spawn(move || {
        rt.block_on(async {
            rx.recv().await;
        });  
    });
    
    handle.spawn(async {
        println!("hello from task1");
    });
    
    handle.spawn(async {
        println!("hello from task2");
    });
    
    // to make sure we shutdown runtime after handling the two spawned 
    // tasks first
    thread::sleep(Duration::from_secs(4));
    
    handle.spawn(async move {
        tx.send(()).await.unwrap();
    });
}

Playground.

2 Likes

I guess tokio::sync::mpsc has the easier to use interface, so maybe that's better than watch.

I think this boils down to personal preference :slightly_smiling_face:

watch might be preferrable if you have multiple receivers / multiple tasks that need to be shut down.

There is even tokio::sync::oneshot - Rust

Oh, right, forgot about that one. This'd be perfect for shutting down a tokio runtime.

Note that tokio::sync::oneshot::Sender::send will consume self, and you can't clone the Sender, so this might be a limitation because you can only terminate the task from one place (unless you wrap the sender in a Mutex and Option, which would be rather clunky). I thus believe that oneshot generally is not what you want/need for this purpose.


In contrast, tokio::sync::watch::Sender::send only requires a shared reference (which can be copied or made available through an Arc). Side note: I would prefer send_replace if there is a variable number of receivers as it will also work when there are no receivers at the time of sending the termination request:

Additionally, this method permits sending values even when there are no receivers.

And if you use tokio::sync::mpsc::Sender instead, the sender can be cloned. So no problem there either.

I would probably use mpsc if you have a single receiver, and watch if you have multiple receivers.

Actually, I have a struct that keeps runtime Handle, thread JoinHandle, and channel Sender. And yes I wrap it all in the Option, because the same problem is with thread JoinHandle, JoinHandle::join consumes the JoinHandle. So no problem with oneshot, I already have an Option to wrap it.

Ah okay, if that works for you (I haven't read the Playground yet) or you don't mind the Sender being consumed, you can use oneshot of course. It might have the least overhead then.


I see your need for Option is that Drop::drop only gets a &mut Self and can't move out of self. I think it's necessary and idiomatic to use an Option there (if you don't use unsafe, which I wouldn't). An alternative would be to replace the used values with a dummy value (e.g. a dummy Sender), but that would be more overhead and awkward, I guess.