Why don't my threads run at the same time?

Hello ,
I have an issue with this function :

async fn run_urls(
    domains: Vec<String>,
    subs: bool,
    check: bool,
    output: Option<&str>,
    delay: u64,
    color: bool,
    verbose: bool,
    blacklist: Vec<String>,
    whitelist: Vec<String>,
) {
    let output_string: Arc<Mutex<String>> = Arc::new(Mutex::new(String::new()));

    for domain in domains {
        let black = blacklist.clone();
        let white = whitelist.clone();
        let string = Arc::clone(&output_string);
        tokio::spawn(async move {
            let ret_url = run_url(
                    domain,
                    subs,
                    check,
                    delay,
                    color,
                    verbose,
                    black,
                    white,
                ).await;
                string.lock().await.push_str(ret_url.as_str());
        }).await.expect("thread error");
    }

    match output {
        Some(file) => {
            write_string_to_file(output_string.lock().await.to_string(), file);
            if verbose {
                println!("urls saved to {}", file)
            };
        }
        None => return,
    }
}

I don't understand why the two (there is 2 domains in "domains" variable) run_url are not running in the same time...
What should I change so it run at the same time please ?

When you use .await it will block the current task until a future completes. That means your for-loop will iterate over each domain and wait for the spawned task to finish before beginning the next one.

I think you're looking for futures::future::join_all() to wait on each spawned task in parallel.

1 Like

The tokio::spawn function returns a JoinHandle you can use to wait for the spawned task, so since you immediately await it, it waits for it before going to the next iteration. In this case I would recommend putting the join handles into a vector, then after spawning all of them, loop through the vector and await them. The value you return inside the spawned task is returned by the join handle (it returns a result in case the task panics)

You don't need join_all when you're also using tokio::spawn.

Don't use a Mutex to return them. Just return it from the end of the task and put it into the vector when awaiting the join handles.

3 Likes

I made this code, is it the good way ? It looks like it work well :

async fn run_urls(
    domains: Vec<String>,
    subs: bool,
    check: bool,
    output: Option<&str>,
    delay: u64,
    color: bool,
    verbose: bool,
    blacklist: Vec<String>,
    whitelist: Vec<String>,
) {
    let output_string: Arc<Mutex<String>> = Arc::new(Mutex::new(String::new()));
    let mut join_handles = Vec::new();
    for domain in domains {
        let black = blacklist.clone();
        let white = whitelist.clone();
        let string = Arc::clone(&output_string);
        join_handles.push(tokio::spawn(async move {
            let ret_url = run_url(domain, subs, check, delay, color, verbose, black, white).await;
            string.lock().await.push_str(ret_url.as_str());
        }));
    }
    join_all(join_handles).await;

    if let Some(file) = output {
        write_string_to_file(output_string.lock().await.to_string(), file);
        if verbose {
            println!("urls saved to {}", file)
        };
    }
}

You don't need a mutex:

async fn run_urls(
    domains: Vec<String>,
    subs: bool,
    check: bool,
    output: Option<&str>,
    delay: u64,
    color: bool,
    verbose: bool,
    blacklist: Vec<String>,
    whitelist: Vec<String>,
) {
    let mut join_handles = Vec::with_capacity(domains.len());
    for domain in domains {
        let black = blacklist.clone();
        let white = whitelist.clone();
        join_handles.push(tokio::spawn(async move {
            let ret_url = run_url(domain, subs, check, delay, color, verbose, black, white).await;
            ret_url
        }));
    }

    let mut output_string = String::new();
    for handle in join_handles {
        let ret_url = handle.await.expect("panic in run_url");
        output_string.push_str(ret_url.as_str());
    }

    if let Some(file) = output {
        write_string_to_file(output_string, file);
        if verbose {
            println!("urls saved to {}", file)
        };
    }
}
1 Like

Thank you for your help, It works perfectly but I'm kinda lost on why it works ...

There is no need for join_all ? why the handle.await doesnt act like tokio::spawn()...await
if the handle is just the result of tokio::spawn in a vector ?

When you call tokio::spawn, the spawned task starts running immediately. It works because you spawned all of them before you start awaiting them. The join_all function is an alternative to tokio::spawn, which would look like this:

async fn run_urls(
    domains: Vec<String>,
    subs: bool,
    check: bool,
    output: Option<&str>,
    delay: u64,
    color: bool,
    verbose: bool,
    blacklist: Vec<String>,
    whitelist: Vec<String>,
) {
    let mut run_url_tasks = Vec::with_capacity(domains.len());
    for domain in domains {
        let black = blacklist.clone();
        let white = whitelist.clone();

        // notice no tokio::spawn
        run_url_tasks.push(async move {
            let ret_url = run_url(domain, subs, check, delay, color, verbose, black, white).await;
            ret_url
        });
    }

    // this is a Vec<String>
    let ret_urls = join_all(run_url_tasks).await;

    let mut output_string = String::new();
    for ret_url in ret_urls {
        output_string.push_str(ret_url.as_str());
    }

    if let Some(file) = output {
        write_string_to_file(output_string, file);
        if verbose {
            println!("urls saved to {}", file)
        };
    }
}

Unlike when using tokio::spawn, the tasks wont run until you call join_all.

2 Likes

Oh ok !
Thank you I get it now !
Have a good day , and thank you both for your help !

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.