Thread 'tokio-runtime-worker' panicked at 'thread 'called `Result::unwrap()` on an `Err` value: Io(Os { code: 24, kind: Other, message: "Too many open files" })

Hello,
I made a tool with Rust and I use tokio::spawn to use multiple worker in my tool.
The issue I have is that when my tool wants to run too much workers I got the following error :

thread 'tokio-runtime-worker' panicked at 'thread 'called Result::unwrap() on an Err value: Io(Os { code: 24, kind: Other, message: "Too many open files" })

The problem doesn't appear when I run ulimit -n 999999 before running my tool.

But do you know how should I fix this issue without the need to use ulimit before running my tool ?
Thank you

1 Like

You could increase the limit within the process by calling setrlimit.

Thank you, is it safe for the user ? The OS will not get slow or something ?
Is it something a developper should do ?

It depends whether you intended to open many files. For example, if you do:

for path in million_file_paths {
    spawn(async_open_file(path));
}

then it may try to open a million files at once.

You may want to limit concurrency, e.g. buffered Stream can be used to process only limited number of async things at a time.

1 Like

If I have this code :

async fn http_status_urls(urls: Vec<String>, delay: u64, color: bool, verbose: bool) -> String {
    if verbose {
        println!("We're checking status of {} urls... ", urls.len())
    };
    let mut join_handles = Vec::with_capacity(urls.len());
    for url in urls {
        join_handles.push(tokio::spawn(async move {
            let status_ret = run_status(url.as_str(), color).await;
            if delay > 0 {
                let delay_time = time::Duration::from_millis(delay);
                thread::sleep(delay_time);
            }
            status_ret
        }));
    }

    let mut ret: String = String::new();
    for handle in join_handles {
        let ret_url: String = handle.await.expect("panic in check http status");
        ret.push_str(ret_url.as_str());
    }
    ret
}

What would be the best way to avoid making my computer struggles with bad "ulimit" and still have a quick program, please ?

You can spawn a fixed number and keep track of them using FuturesUnordered, which allows waiting until any single future completes in the set. Alternatively spawn a fixed number and send the responses home using channels, and each time a task completes, spawn another one.

Thank for you help !
And I don't understand why there is a really long time between the join_all or

for handle in join_handles {
        let ret_url: String = handle.await.expect("panic in check http status");
        ret.push_str(ret_url.as_str());
}

and the first println! display (the println is in the run_status function),

Shouldn't be while the run_status are executed so directly ?

In this example, the "middle...." is printed directly and the first println! in run_status is way later, why is it not directly ?

async fn http_status_urls(urls: Vec<String>, delay: u64, color: bool, verbose: bool) -> String {
    if verbose {
        println!("We're checking status of {} urls... ", urls.len())
    };
    let mut join_handles : Vec<task::JoinHandle<String>>= vec![];
    for url in urls {
        join_handles.push(tokio::spawn(async move {
            let status_ret = run_status(url.as_str(), color).await;
            if delay > 0 {
                let delay_time = time::Duration::from_millis(delay);
                thread::sleep(delay_time);
            }
            status_ret
        }));
    }
    println!("middle...........");
    let mut ret: String = String::new();
    for handle in join_handles {
        let ret_url: String = handle.await.expect("panic in check http status");
        ret.push_str(ret_url.as_str());
    }
    ret
}

You're using thread::sleep to sleep in an async function. This is a very bad idea and will incredibly quickly exhaust all of the executor's threads. It only has four or eight depending on your cpu, so once you're running four or eight sleeps, you're out of threads. You need to use Tokio's delay_for, which gives control of the thread back to Tokio during the sleep. Consider reading this thread.

4 Likes

It's weird to use tokio::spawn in a loop like that. You're adding unnecessary async-on-top-of-async and needless synchronization between independent tasks that you make dependent.
It's OK to use spawn once to launch your whole program on a chosen runtime, or to launch some big tasks that truly need to run in the background.

But for simple tasks of waiting for a bunch of futures, there are more appropriate mechanisms:

  • For handful of futures, collecting into Vec and join_all is fine

  • For a few futures, collecting into FuturesUnordered has lower overhead

  • For large number of futures, you should make them a Stream and set how many of them should be running at the same time via buffered(n) or buffer_unordered(n). Otherwise when all of them are spawned at the same time, you'll run out of memory, file descriptors, etc.

1 Like

Thank you,
I'll use delay_for instead of sleep.
But can you explain me what is taking so much time between the print of "middle....." and the first println! in status_url ? It's taking so much time even when delay is 0 and sleep is not executed

Thank you for you help.
I've been inspired by this code , is it wrong too or if this code is correct, why mine isn't ?

// Iterate over the paths.
let mut tasks: Vec<JoinHandle<Result<(), ()>>>= vec![];
for path in paths {

    // Copy each path into a new string
    // that can be consumed/captured by the task closure
    let path = path.clone();

    // Create a Tokio task for each path
    tasks.push(tokio::spawn(async move {
        match reqwest::get(&path).await {
            Ok(resp) => {
                match resp.text().await {
                    Ok(text) => {
                        println!("RESPONSE: {} bytes from {}", text.len(), path);
                    }
                    Err(_) => println!("ERROR reading {}", path),
                }
            }
            Err(_) => println!("ERROR downloading {}", path),
        }
        Ok(())
    }));
}

// Wait for them all to finish
println!("Started {} tasks. Waiting...", tasks.len());
join_all(tasks).await;

It might be that they aren't completing in order, but you start out by waiting for the first one.

Thank you for your help,
I almost made my function works like expected, I just need to add the result of each request to a String but I can't :

async fn http_status_urls(urls: Vec<String>, delay: u64, color: bool, verbose: bool) -> String {
    if verbose {
        println!("We're checking status of {} urls... ", urls.len())
    };
    const PARALLEL_REQUESTS: usize = 8;
    let bodies = stream::iter(urls)
        .map(|url| {
            async move {
                if delay > 0 {
                    let delay_time = time::Duration::from_millis(delay);
                    // thread::sleep(delay_time); TO CHANGE
                }
                reqwest::get(&url).await
            }
        })
        .buffer_unordered(PARALLEL_REQUESTS);
    let mut ret: String = String::new();
    bodies
        .for_each(|b| {
            async {
                match b {
                    Ok(response) => {
                        let str_output = if color {
                            format!("{} {}\n",&response.url(),colorize(&response))
                        } else {
                            format!("{} {}\n", &response.url(), &response.status())
                        };
                        print!("{}", str_output);
                       ret.push_str(&str_output); //returns a reference to a captured variable which escapes the closure body

                    }
                    Err(e) => {
                        eprintln!("error geting : {}", e);
                    }
                }
            }
        })
        .await;
    ret
}

How can I make my push_str work ?
I need to get the result of all request in a String

Does it work if you make the async block (but not the closure) move?

bodies.for_each(|b| async move { ...

Since the closure is non-move, it takes a reference to ret, but then you want the async block to take ownership of that reference, instead of storing a reference to the reference, which I'm guessing is the issue.

Since you use ret after the closure, it'll probably also work if you mark the closure as move, as move-closures only actually take ownership if you don't use it later.

Like that ?

    let mut ret: String = String::new();
    bodies
        .for_each(|b| async move {
                match b {
                    Ok(response) => {
                        let str_output = if color {
                            format!("{} {}\n",&response.url(),colorize(&response))
                        } else {
                            format!("{} {}\n", &response.url(), &response.status())
                        };
                        print!("{}", str_output);
                        ret.push_str(&str_output);
                    }
                    Err(e) => {
                        eprintln!("error geting : {}", e);
                    }
                }
            })
        .await;
    ret

I get :

error[E0507]: cannot move out of `ret`, a captured variable in an `FnMut` closure
   --> src/main.rs:465:34
    |
463 |       let mut ret: String = String::new();
    |           ------- captured outer variable
464 |       bodies
465 |           .for_each(|b| async move {
    |  __________________________________^
466 | |                 match b {
467 | |                     Ok(response) => {
468 | |                         let str_output = if color {
...   |
474 | |                         ret.push_str(&str_output);
    | |                         ---
    | |                         |
    | |                         move occurs because `ret` has type `std::string::String`, which does not implement the `Copy` trait
    | |                         move occurs due to use in generator
...   |
479 | |                 }
480 | |             })
    | |_____________^ move out of `ret` occurs here

error[E0382]: use of moved value: `ret`
   --> src/main.rs:482:5
    |
463 |     let mut ret: String = String::new();
    |         ------- move occurs because `ret` has type `std::string::String`, which does not implement the `Copy` trait
464 |     bodies
465 |         .for_each(|b| async move {
    |                   --- value moved into closure here
...
474 |                         ret.push_str(&str_output);
    |                         --- variable moved due to use in closure
...
482 |     ret
    |     ^^^ value used here after move

error: aborting due to 2 previous errors

Yeah, I don't have an example I can play around with myself, so I can't check if it works. What happens if you do this?

let mut ret = String::new();
let ret_mut = &mut ret;
bodies.for_each(move |b| async move {
    // ... use ret_mut here
}).await;
ret

The alternative is to not use for_each.

let mut ret = String::new();
while let Some(b) = bodies.next().await {
    // use ret here
}
ret

The first method doesn't work, but the second works perfectly !
Thank you !

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.