Eventloop in rust

Hello,
I need help with rust based eventloop crate. Is there any high level eventloop library in rust. Basically I want to pass a method/closure to event loop and want it to be executed periodically at regular intervals. I checked the calloop library. I was able to add callback to it but when I call dispatch/run explicitly, it blocks till the callback is called. What I want is an implementation of event loop which doesnt block and executes the callback in separate thread. I want to post callbacks from different files and want it to be executed without blocking. Any direction on this will be extremely helpful.

from your description it sounds like a XY problem. what exact problems are you trying to solve? are you looking for a general purpose thread pool implementation? or some kind of async runtime? is your callback CPU bound or IO bound? do you need to integrate the even loop with other event loops such as GUI? there are so many solutions for different problems. please state your problem more clearly.

1 Like

It sounds like you're processing files concurrently, so I'm assuming you want to do IO-bound concurrency.

The most common way to do non-blocking I/O in Rust is to use an asynchronous runtime like Tokio, smol or async-std.

Taking Tokio as an example, you can launch a repeating task in the following way:

use tokio::task;
use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    let repeating_task = task::spawn(async {
        let mut interval = time::interval(Duration::from_secs(2));
        loop {
            // wait 2 secs
            // Note: doesn't wait the very first time it's called
            interval.tick().await;
            // do stuff here
        }
    });

    // launch other tasks, etc

    // all tasks stop running when the main thread exits, so we 
    // should block here to stop that from happening
    let _ = tokio::join!(repeating_task, /* ...other tasks */);
}

I want to make network call for every few seconds periodically. Number of such periodic requests will be in large number (more than 50), so I don't want to create a thread for each of them. Instead, I want to add each request, for eg. add requestA to eventloop and then process it in a separate thread after x seconds and then add requestA again to eventloop. Similarly for many other requests.

Hi,
I tried this but number of concurrent requests will be large and I do not want to create a blocking thread for each of them. Instead I want to add requests to eventloop and then process them after a delay.

If you use Tokio, then you can use a tokio::sync::Semaphore to put a limit on the number of concurrent file operations.

1 Like

Hi,
I tried that. But each of the operations needs to be executed periodically, so all the semaphores will be blocked and I cant process a new request.

I'm not sure I understand. If the semaphore is full, then it will block requests until one of the running ones finish. But is that not what you wanted?

1 Like

Right. In my case each request will be periodically executed and never finishes. It will be idle for x seconds and then start again.

In that case, I would only protect the non-idle regions with a semaphore. Something like this:

tokio::spawn(async move {
    loop {
        tokio::time::sleep(idle_duration).await;

        let sem = semaphore.clone().acquire_owned().await;
        tokio::task::spawn_blocking(move || {
            // do file operations
            drop(sem);
        }).await.unwrap();
    }
});

(this is a sketch, I did not try to compile it)

1 Like

This looks fine for me but will this work if there are 100s of such file operations executing periodically.

Yes. Tokio is designed such that you can have millions of tokio::spawn tasks running without problems.

1 Like

ohh okay. Thanks for the information. Even if below code is executed in tokio thread, will it handle 100s of operations..

operation 1:
loop {
make_sync_call(x1);
//sleep for 1 secs
}

opeation 2:
loop {
make_sync_call(x2);
//sleep for 2 secs
}
.
.
operation 100:
loop {
make_sync_call(x100);
//sleep for 100 secs
}

Yes, as long as your sync calls are wrapped in spawn_blocking, that should be fine.

See this article on why you need spawn_blocking: Async: What is blocking?

See also this article: How Much Memory Do You Need to Run 1 Million Concurrent Tasks?

1 Like

I will go through this and get back to you. Thanks for the info. It was very helpful.

This seems to be exactly what I want. I will go through the articles in detail. Thank you for the solution @alice

You're welcome.

Hi @alice, I tried with your approach, where I added a method to event loop, all eventloop operations are wrapped inside spawn_blocking, but I was getting an error, expected an FnOnce<()> closure, found impl Future<Output = ()>, where I was trying to call function taken as an argument
Any suggestions on this would be extremely helpful, thanks in advance

The spawn_blocking function only works with sync code. You cannot give it async code.

To unpack that error a bit more:

  • FnOnce is a trait implemented by functions that you can call once.
    • for example if the closure (anonymous function) move || unique_object.consume() will destroy the unique_object value when called, you can call it once, but not again after that.
    • on the other hand, || println!("whatever") can be run as many times as you like, so it is Fn as well as being FnOnce
  • FnOnce<()> (which normally looks like FnOnce(), not sure why it doesn't here) means that the function must take no arguments, and return nothing.
  • impl Future<Output = ()> is a bit scary looking but, breaking it down:
    • impl SomeTrait means an un-named implementation of SomeTrait
    • Future is the trait for async code. You can create one by:
      • evaluating an async block: let future = async { /* some code */ } (or async move { })
      • calling an async function, e.g. async fn foo() { /* some code */ } then let future = foo();
      • manually implementing and constructing the Future trait (you don't want to do this)
    • the <Output = ()> means that the implementation of the Future trait must have the associated type Output defined for every Future equal to () (which is Rust for nothing, e.g. void, if you haven't seen it before). So it's essentially saying the Future implementation must evaluate to (), which is what you get if you don't return anything. For example async {} and the result of async fn empty() {} are both impl Future<Output = ()>

To explain why they want different types, it helps to have a rough idea of what tokio is actually doing:

spawn_blocking wants an FnOnce because it's going to call it on a dedicated worker thread in the background, e.g.:

spawn_blocking(|| {
  std::thread::sleep(Duration::from_secs(10));
  println!("blocking in the background");
})

will grab a worker thread, send the || ... function to it and then immediately return, and the worker thread will call the function, which then puts that worker thread to sleep for 10 seconds (so nobody else can use it), then prints in the background.

On the other hand spawn wants a Future directly, without any function, because in Rust Futures don't do anything until they are polled. It then puts it in a big list of Futures that need polling, and the shared worker threads will pull it off and poll it, running any code until it either finishes or it hits a pending .await. E.g.:

spawn(async {
  tokio::time::delay(Duration::from_secs(10)).await;
  println!("async in the background");
})

will create an anonymous Future for the async {} block, spawn will put it on the task list, it will get polled, it will hit the delay().await, register to wake in 10 seconds, and return pending. After 10 seconds it will get put back on the task list, get polled again, resume after the .await, print, then run to completion and be removed.

In both cases, the spawn function will immediately return, and the code runs on a background thread, but they implement it in very different ways, because they receive very different types.

1 Like