Hello,
I need help with rust based eventloop crate. Is there any high level eventloop library in rust. Basically I want to pass a method/closure to event loop and want it to be executed periodically at regular intervals. I checked the calloop library. I was able to add callback to it but when I call dispatch/run explicitly, it blocks till the callback is called. What I want is an implementation of event loop which doesnt block and executes the callback in separate thread. I want to post callbacks from different files and want it to be executed without blocking. Any direction on this will be extremely helpful.
from your description it sounds like a XY problem. what exact problems are you trying to solve? are you looking for a general purpose thread pool implementation? or some kind of async runtime? is your callback CPU bound or IO bound? do you need to integrate the even loop with other event loops such as GUI? there are so many solutions for different problems. please state your problem more clearly.
It sounds like you're processing files concurrently, so I'm assuming you want to do IO-bound concurrency.
The most common way to do non-blocking I/O in Rust is to use an asynchronous runtime like Tokio, smol or async-std.
Taking Tokio as an example, you can launch a repeating task in the following way:
use tokio::task;
use tokio::time::{self, Duration};
#[tokio::main]
async fn main() {
let repeating_task = task::spawn(async {
let mut interval = time::interval(Duration::from_secs(2));
loop {
// wait 2 secs
// Note: doesn't wait the very first time it's called
interval.tick().await;
// do stuff here
}
});
// launch other tasks, etc
// all tasks stop running when the main thread exits, so we
// should block here to stop that from happening
let _ = tokio::join!(repeating_task, /* ...other tasks */);
}
I want to make network call for every few seconds periodically. Number of such periodic requests will be in large number (more than 50), so I don't want to create a thread for each of them. Instead, I want to add each request, for eg. add requestA to eventloop and then process it in a separate thread after x seconds and then add requestA again to eventloop. Similarly for many other requests.
Hi,
I tried this but number of concurrent requests will be large and I do not want to create a blocking thread for each of them. Instead I want to add requests to eventloop and then process them after a delay.
If you use Tokio, then you can use a tokio::sync::Semaphore
to put a limit on the number of concurrent file operations.
Hi,
I tried that. But each of the operations needs to be executed periodically, so all the semaphores will be blocked and I cant process a new request.
I'm not sure I understand. If the semaphore is full, then it will block requests until one of the running ones finish. But is that not what you wanted?
Right. In my case each request will be periodically executed and never finishes. It will be idle for x seconds and then start again.
In that case, I would only protect the non-idle regions with a semaphore. Something like this:
tokio::spawn(async move {
loop {
tokio::time::sleep(idle_duration).await;
let sem = semaphore.clone().acquire_owned().await;
tokio::task::spawn_blocking(move || {
// do file operations
drop(sem);
}).await.unwrap();
}
});
(this is a sketch, I did not try to compile it)
This looks fine for me but will this work if there are 100s of such file operations executing periodically.
Yes. Tokio is designed such that you can have millions of tokio::spawn
tasks running without problems.
ohh okay. Thanks for the information. Even if below code is executed in tokio thread, will it handle 100s of operations..
operation 1:
loop {
make_sync_call(x1);
//sleep for 1 secs
}
opeation 2:
loop {
make_sync_call(x2);
//sleep for 2 secs
}
.
.
operation 100:
loop {
make_sync_call(x100);
//sleep for 100 secs
}
Yes, as long as your sync calls are wrapped in spawn_blocking
, that should be fine.
See this article on why you need spawn_blocking
: Async: What is blocking?
See also this article: How Much Memory Do You Need to Run 1 Million Concurrent Tasks?
I will go through this and get back to you. Thanks for the info. It was very helpful.
This seems to be exactly what I want. I will go through the articles in detail. Thank you for the solution @alice
You're welcome.
Hi @alice, I tried with your approach, where I added a method to event loop, all eventloop operations are wrapped inside spawn_blocking, but I was getting an error, expected an FnOnce<()> closure, found impl Future<Output = ()>, where I was trying to call function taken as an argument
Any suggestions on this would be extremely helpful, thanks in advance
The spawn_blocking
function only works with sync code. You cannot give it async code.
To unpack that error a bit more:
-
FnOnce
is a trait implemented by functions that you can call once.- for example if the closure (anonymous function)
move || unique_object.consume()
will destroy theunique_object
value when called, you can call it once, but not again after that. - on the other hand,
|| println!("whatever")
can be run as many times as you like, so it isFn
as well as beingFnOnce
- for example if the closure (anonymous function)
-
FnOnce<()>
(which normally looks likeFnOnce()
, not sure why it doesn't here) means that the function must take no arguments, and return nothing. -
impl Future<Output = ()>
is a bit scary looking but, breaking it down:-
impl SomeTrait
means an un-named implementation ofSomeTrait
-
Future
is the trait for async code. You can create one by:- evaluating an async block:
let future = async { /* some code */ }
(orasync move { }
) - calling an async function, e.g.
async fn foo() { /* some code */ }
thenlet future = foo();
- manually implementing and constructing the
Future
trait (you don't want to do this)
- evaluating an async block:
- the
<Output = ()>
means that the implementation of theFuture
trait must have the associated typeOutput
defined for everyFuture
equal to()
(which is Rust for nothing, e.g.void
, if you haven't seen it before). So it's essentially saying theFuture
implementation must evaluate to()
, which is what you get if you don't return anything. For exampleasync {}
and the result ofasync fn empty() {}
are bothimpl Future<Output = ()>
-
To explain why they want different types, it helps to have a rough idea of what tokio is actually doing:
spawn_blocking
wants an FnOnce
because it's going to call it on a dedicated worker thread in the background, e.g.:
spawn_blocking(|| {
std::thread::sleep(Duration::from_secs(10));
println!("blocking in the background");
})
will grab a worker thread, send the || ...
function to it and then immediately return, and the worker thread will call the function, which then puts that worker thread to sleep for 10 seconds (so nobody else can use it), then prints in the background.
On the other hand spawn
wants a Future
directly, without any function, because in Rust Future
s don't do anything until they are polled. It then puts it in a big list of Future
s that need polling, and the shared worker threads will pull it off and poll it, running any code until it either finishes or it hits a pending .await
. E.g.:
spawn(async {
tokio::time::delay(Duration::from_secs(10)).await;
println!("async in the background");
})
will create an anonymous Future
for the async {}
block, spawn
will put it on the task list, it will get polled, it will hit the delay().await
, register to wake in 10 seconds, and return pending. After 10 seconds it will get put back on the task list, get polled again, resume after the .await
, print, then run to completion and be removed.
In both cases, the spawn function will immediately return, and the code runs on a background thread, but they implement it in very different ways, because they receive very different types.