Hello,
I made a tool with Rust and I use tokio::spawn to use multiple worker in my tool.
The issue I have is that when my tool wants to run too much workers I got the following error :
thread 'tokio-runtime-worker' panicked at 'thread 'called Result::unwrap() on an Err value: Io(Os { code: 24, kind: Other, message: "Too many open files" })
The problem doesn't appear when I run ulimit -n 999999 before running my tool.
But do you know how should I fix this issue without the need to use ulimit before running my tool ?
Thank you
You can spawn a fixed number and keep track of them using FuturesUnordered, which allows waiting until any single future completes in the set. Alternatively spawn a fixed number and send the responses home using channels, and each time a task completes, spawn another one.
You're using thread::sleep to sleep in an async function. This is a very bad idea and will incredibly quickly exhaust all of the executor's threads. It only has four or eight depending on your cpu, so once you're running four or eight sleeps, you're out of threads. You need to use Tokio's delay_for, which gives control of the thread back to Tokio during the sleep. Consider reading this thread.
It's weird to use tokio::spawn in a loop like that. You're adding unnecessary async-on-top-of-async and needless synchronization between independent tasks that you make dependent.
It's OK to use spawn once to launch your whole program on a chosen runtime, or to launch some big tasks that truly need to run in the background.
But for simple tasks of waiting for a bunch of futures, there are more appropriate mechanisms:
For handful of futures, collecting into Vec and join_all is fine
For a few futures, collecting into FuturesUnordered has lower overhead
For large number of futures, you should make them a Stream and set how many of them should be running at the same time via buffered(n) or buffer_unordered(n). Otherwise when all of them are spawned at the same time, you'll run out of memory, file descriptors, etc.
Thank you,
I'll use delay_for instead of sleep.
But can you explain me what is taking so much time between the print of "middle....." and the first println! in status_url ? It's taking so much time even when delay is 0 and sleep is not executed
Thank you for you help.
I've been inspired by this code , is it wrong too or if this code is correct, why mine isn't ?
// Iterate over the paths.
let mut tasks: Vec<JoinHandle<Result<(), ()>>>= vec![];
for path in paths {
// Copy each path into a new string
// that can be consumed/captured by the task closure
let path = path.clone();
// Create a Tokio task for each path
tasks.push(tokio::spawn(async move {
match reqwest::get(&path).await {
Ok(resp) => {
match resp.text().await {
Ok(text) => {
println!("RESPONSE: {} bytes from {}", text.len(), path);
}
Err(_) => println!("ERROR reading {}", path),
}
}
Err(_) => println!("ERROR downloading {}", path),
}
Ok(())
}));
}
// Wait for them all to finish
println!("Started {} tasks. Waiting...", tasks.len());
join_all(tasks).await;
Does it work if you make the async block (but not the closure) move?
bodies.for_each(|b| async move { ...
Since the closure is non-move, it takes a reference to ret, but then you want the async block to take ownership of that reference, instead of storing a reference to the reference, which I'm guessing is the issue.
Since you use ret after the closure, it'll probably also work if you mark the closure as move, as move-closures only actually take ownership if you don't use it later.
let mut ret: String = String::new();
bodies
.for_each(|b| async move {
match b {
Ok(response) => {
let str_output = if color {
format!("{} {}\n",&response.url(),colorize(&response))
} else {
format!("{} {}\n", &response.url(), &response.status())
};
print!("{}", str_output);
ret.push_str(&str_output);
}
Err(e) => {
eprintln!("error geting : {}", e);
}
}
})
.await;
ret
I get :
error[E0507]: cannot move out of `ret`, a captured variable in an `FnMut` closure
--> src/main.rs:465:34
|
463 | let mut ret: String = String::new();
| ------- captured outer variable
464 | bodies
465 | .for_each(|b| async move {
| __________________________________^
466 | | match b {
467 | | Ok(response) => {
468 | | let str_output = if color {
... |
474 | | ret.push_str(&str_output);
| | ---
| | |
| | move occurs because `ret` has type `std::string::String`, which does not implement the `Copy` trait
| | move occurs due to use in generator
... |
479 | | }
480 | | })
| |_____________^ move out of `ret` occurs here
error[E0382]: use of moved value: `ret`
--> src/main.rs:482:5
|
463 | let mut ret: String = String::new();
| ------- move occurs because `ret` has type `std::string::String`, which does not implement the `Copy` trait
464 | bodies
465 | .for_each(|b| async move {
| --- value moved into closure here
...
474 | ret.push_str(&str_output);
| --- variable moved due to use in closure
...
482 | ret
| ^^^ value used here after move
error: aborting due to 2 previous errors