Trying to share a vector between threads, the most efficient way

In a continuation of my previous thread --> Working with Arc to add "recursive" functionality

I am now trying to share a mutable vector between threads, for the first time. I am attempting to use Rayon as its worked well for me in the past and made the process easier. But this time I am attempting to do it without performing hard String clone()'ing and using Arc<Mutex> (if there is a better way I don't know it yet).

Currently running into an issue where in order to evaluate whether my loop should continue, I check to see if the vector is_empty(), but when the vector is set up as Arc<Mutex<Vec<String>>>, I have to lock and unwrap it to check, then I cannot proceed with the code throwing the "cannot pass MutexGuard between threads safely"

error[E0277]: `std::sync::MutexGuard<'_, Vec<std::string::String>>` cannot be sent between threads safely
   --> src/main.rs:43:30
    |
43  |                   rayon::scope(move |s| {
    |                   ------------ ^-------
    |                   |            |
    |  _________________|____________within this `[closure@src/main.rs:43:30: 43:38]`
    | |                 |
    | |                 required by a bound introduced by this call
44  | |                     let result = make_request(line.unwrap(), Arc::clone(&cli));
45  | |                     if result != String::from("NONE") {
46  | |                         found_dirs.push(result);
47  | |                     };
48  | |                 });
    | |_________________^ `std::sync::MutexGuard<'_, Vec<std::string::String>>` cannot be sent between threads safely

Can I unlock the the Mutex after checking is_empty(), is that way to correctly do this? With that in mind, do I have to lock/unlock it each time I attempt to add a new item to that vector?

Here is the code for reference:

use clap::Parser;
use std::path::PathBuf;
use http::Uri;
use http::header::COOKIE;
use std::fs::File;
use std::io::{BufRead, BufReader};
use rayon;
use std::sync::{Arc, Mutex};

#[derive(Parser, Debug)]
#[command(author="Andrew", version="0.1.0",
about="directory busting tool.", long_about=None)]
struct Args {
    /// required: The wordlist to use for the busting
    #[arg(short, long, required=true, value_parser=clap::value_parser!(PathBuf))]
    wordlist: PathBuf,
    /// required: The target URL to go against
    #[arg(short, long, required=true, value_parser=clap::value_parser!(Uri))]
    target: Uri,
    /// optional: Cookies to use for authenticated requests
    #[arg(short, long, value_parser=clap::value_parser!(Option<String>))]
    cookie: Option<String>,
    /// optional: Perform the scan as a recursive scan
    #[arg(short, long, default_value_t=false)]
    recursive: bool,
}

fn main() {
    let cli = Arc::new(Args::parse());

    let mut found_dirs = Arc::new(Mutex::new(Vec::new()));
    let mut count = 0;


    loop {
        let word_list = cli.wordlist.to_owned();
        let file = File::open(word_list).expect("Could not read file.");
        let reader = BufReader::new(file);
        let mut found_dirs = found_dirs.lock().unwrap();
        if found_dirs.is_empty() && count == 0 {
            count += 1;
            for line in reader.lines() {
                rayon::scope(move |s| {
                    let result = make_request(line.unwrap(), Arc::clone(&cli));
                    if result != String::from("NONE") {
                        found_dirs.push(result);
                    };
                });
            };
        } else if count == 1 && found_dirs.is_empty() {
            break;
        } else {
            let ext = found_dirs.pop().unwrap();
            for line in reader.lines() {
                rayon::scope(move |s| {
                    let new_line = format!("{}/{}", ext, line.unwrap());
                    make_request(new_line, Arc::clone(&cli));
                });
            }
        }
    }
    println!("Scan complete.");
}

#[tokio::main]
async fn make_request(line: String, args: Arc<Args>) -> String {
    let target = &args.target;
    let cookie = &args.cookie;
    match cookie.is_some() {
        true => {
            let url = format!("{}{}", &target, &line);
            let client = reqwest::Client::new();
            let req_resp = client.get(&url)
                .header(COOKIE, cookie.as_ref().unwrap())
                .send()
                .await
                .unwrap()
                .status()
                .as_u16();
            if req_resp == 200 {
                println!("| {} | {} | {} | => Dir found, adding to list", req_resp, url, &line);
                return line;
            } else {
                println!("| {} | {} | {} |", req_resp, url, &line);
            }
        },
        false => {
            let url = format!("{}{}", &target, &line);
            let client = reqwest::Client::new();
            let req_resp = client.get(&url)
                .send()
                .await
                .unwrap()
                .status()
                .as_u16();
            if req_resp == 200 {
                println!("| {} | {} | {} | => Dir found, adding to list", req_resp, url, &line);
                return line;
            } else {
                println!("| {} | {} | {} |", req_resp, url, &line);
            }
        },
    };
    return String::from("NONE");
}

Researching online there are examples out there, but there either super specific to that persons use case, or they are super generic, and its hard to build something from those examples, just to specifically understand it. I feel like I need to see a visual example of the proper process to understand it better in this case. Maybe that is just me =/ and I'm overly fixated on it, but any help is greatly appreciated!

Checking if a vector is empty before every addition and sharing it between threads is a huge red flag. Your code will perform worse than using a single thread. Why do you want to share a Vec between multiple threads to begin with?

hey, did not know that it was a red flag XD.

The primary use case is to have a list of found items that will be used recursively to extend the search.

Right now it runs once to find any directories, and if one returns specifically 200, its added to a vector where the next iteration pops it out of the vector and continues to scan performing one level deeper iteration. So it starts looking for anything returning 200 resposnes from http://example.com/<word_list_word>. Then the second iteration pops out any found words, and extends the search, so http://example.com/<word_from_vec>/<word_list_word>

I have checked your code, and there are many other red flags you should be aware of:

  1. You are mixing sync and async code in a bad way. Performing http requests is mostly IO-bound, and you are blocking the thread on each request.
  2. You are declaring both a main and a tokio_main functions. What the tokio_main macro desugars into is something like this:
fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            // The body of your make_request function
        })
}

You can see that it will block the thread on every request.

  1. You are not handling errors. You risk missing found cases by not handling the http request errors.

As a first step, what you should do is simply run all the requests concurrently. Rust will already do this for you if you use the async/await syntax. As I said before, you should probably not use multiple threads for this use case since it seems to be IO-bound.

1 Like

Thank you for this, I have asked a lot of questions out there and nobody has pointed that out to me, about tokio. I haven't built any error handling into it yet as I figured i could add error handling once I get a working version built-up.

I was curious why the speed was always so slow to me. But makes sense that I was blocking the thread. I honestly figured that was just the nature of it, because a lot of people out there say to start your build with a single-thread to get it to work. Then make it multi-threaded, etc.

As a first step, what you should do is simply run all the requests concurrently.

How can I do this using the async/await syntax you mention. Do you have specific links to the information? I want to ensure I can see or read on what exactly your talking about as it seems I missed a very important item here =/.

Again thank you!

This is the naive conversion of your code to async/await. Note that I added some comments that you should be aware of:

use clap::Parser;
use std::path::PathBuf;
use http::Uri;
use http::header::COOKIE;
use std::fs::File;
use std::io::{BufRead, BufReader};
use rayon;
use std::sync::{Arc, Mutex};

#[derive(Parser, Debug)]
#[command(author="Andrew", version="0.1.0",
about="directory busting tool.", long_about=None)]
struct Args {
    /// required: The wordlist to use for the busting
    #[arg(short, long, required=true, value_parser=clap::value_parser!(PathBuf))]
    wordlist: PathBuf,
    /// required: The target URL to go against
    #[arg(short, long, required=true, value_parser=clap::value_parser!(Uri))]
    target: Uri,
    /// optional: Cookies to use for authenticated requests
    #[arg(short, long, value_parser=clap::value_parser!(Option<String>))]
    cookie: Option<String>,
    /// optional: Perform the scan as a recursive scan
    #[arg(short, long, default_value_t=false)]
    recursive: bool,
}

#[tokio::main]
async fn main() {
    let cli = Arc::new(Args::parse());

    let mut found_dirs = Vec::new();
    let mut count = 0;


    loop {
        let word_list = cli.wordlist.to_owned();
        let file = File::open(word_list).expect("Could not read file.");
        let reader = BufReader::new(file);
        let mut found_dirs = found_dirs.lock().unwrap();
        if found_dirs.is_empty() && count == 0 {
            count += 1;
            for line in reader.lines() {
                let result = make_request(line.unwrap(), cli.clone()).await;
                if result != String::from("NONE") {
                    found_dirs.push(result);
                };
            };
        } else if count == 1 && found_dirs.is_empty() {
            // You are only breaking out of the loop if the count is exactly one, which results in an infinite loop
            break;
        } else {
            let ext = found_dirs.pop().unwrap();
            for line in reader.lines() {
                let new_line = format!("{}/{}", ext, line.unwrap());
                // You aren't doing anything with the result of this call
                make_request(new_line, cli.clone()).await;
            }
        }
    }
    println!("Scan complete.");
}

async fn make_request(line: String, args: Arc<Args>) -> String {
    let target = &args.target;
    let cookie = &args.cookie;
    match cookie.is_some() {
        true => {
            let url = format!("{}{}", &target, &line);
            let client = reqwest::Client::new();
            let req_resp = client.get(&url)
                .header(COOKIE, cookie.as_ref().unwrap())
                .send()
                .await
                .unwrap()
                .status()
                .as_u16();
            if req_resp == 200 {
                println!("| {} | {} | {} | => Dir found, adding to list", req_resp, url, &line);
                return line;
            } else {
                println!("| {} | {} | {} |", req_resp, url, &line);
            }
        },
        false => {
            let url = format!("{}{}", &target, &line);
            let client = reqwest::Client::new();
            let req_resp = client.get(&url)
                .send()
                .await
                .unwrap()
                .status()
                .as_u16();
            if req_resp == 200 {
                println!("| {} | {} | {} | => Dir found, adding to list", req_resp, url, &line);
                return line;
            } else {
                println!("| {} | {} | {} |", req_resp, url, &line);
            }
        },
    };
    return String::from("NONE");
}

Yes, sure! Here you go:

1 Like

Regarding the error message in your intial post: std::sync::MutexGuard is not Send for a good reason - the thread which acquires the lock should also be the one which releases it.

Was able to get a working version and I'm pleasantly surprised by its performance. I really appreciate you telling me that I was set-up wrong to begin with!

With that being said, i'm back to the original issue and was curious if you had any insights into how I can get it done. I still need to get a result from the HTTP requests, if they match a criteria then perform recursive iteration from there. What is the best way to do handle a potential growing collection of items in an async set-up?

There are too many things that you need to consider when running http requests concurrently that it would be hard to convey them in a single message, but I recommend you to listen to this talk to get a better grasp of the task in question: https://www.youtube.com/watch?v=BIguvia6AvM.

Here's an example of using Lychee with MPSC channels: https://github.com/lycheeverse/lychee/blob/master/examples/client_pool/client_pool.rs.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.