Multiprocessing and asynchronous tasks

I want to scrape a website and for that I was thinking of creating a multi-threaded program as it will allow me to scrape multiple pages at the same time. As the response from the website takes about 100ms, if I want to scrape 120 pages, it will take me 12 seconds.

The fact that the program is multi-threaded will just allow it to scrape a few more pages: as many as the threads I am spawning. In my particular case I'm spawning 12 threads and by using 12 threads I'm able to scrape 120 pages in 1 second.

I though that while the program waits for the response it could try to scrape more pages and wait for all of them. For example, each thread could fetch 10 pages and then wait for the response asynchronously. In this case, scraping 120 pages would only take 100ms.

I wrote this example and instead of scraping a webpage I'm executing a function that takes 100ms to double a given number. I want to know if this code is alright or if it can be improved as I'm also doing this to learn Rust.


Cargo.toml

[dependencies]
log = "0.4.13"
env_logger = "0.8.2"
tokio = { version = "1", features = ["full"] }

main.rs

use log::{info, trace};

use std::sync::{Arc, Mutex, mpsc};
use std::collections::VecDeque;
use std::time::Duration;

use tokio;

/// Create a WorkQueue of any type that holds all the work to be done
#[derive(Clone)]
struct WorkQueue<T> {
    queue: Arc<Mutex<VecDeque<T>>>
}


impl<T> WorkQueue<T> {
    /// Create a new empty queue
    fn new() -> Self {
        Self {
            queue: Arc::new(Mutex::new(VecDeque::new()))
        }
    }

    /// Add work to the queue
    fn add_work(&self, work: T) -> Result<(), ()> {
        let queue = self.queue.lock();

        if let Ok(mut q) = queue {
            q.push_back(work);
            Ok(())
        } else {
            Err(())
        }
    }
    
    /// Get the first available work
    fn get_work(&self) -> Option<T> {
        // Lock the queue to fetch a work to do and prevent other threads from
        // fetching the same work.
        let queue = self.queue.lock();

        if let Ok(mut q) = queue {
            // Remove the first work available
            // Follows the the FIFO layout
            q.pop_front()
        } else {
            None
        }
    }

    /// Count the work left
    fn length(&self) -> Option<usize> {
        let queue = self.queue.lock();

        if let Ok(q) = queue {
            Some(q.len())
        } else {
            None
        }
    }
}


/// A very complex calculation that takes too much time to execute
async fn complex_calculation(x: i32, duration: u64) -> i32 {
    trace!("Calculating");

    // Use tokio::time::sleep instead of thread::sleep to avoid blocking the
    // entire thread.
    tokio::time::sleep(Duration::from_millis(duration)).await;
    x * 2
}


async fn create_worker(i: u32,
                       queue_clone: WorkQueue<i32>,
                       max_work_async: i32,
                       tx_clone: mpsc::Sender<i32>) {
    
    // How much work has this thread done
    let mut work_done: i32 = 0;
    let mut current_work: i32 = 0;

    // Check if there is more work to be done
    while queue_clone.length().unwrap() > 0 {
        trace!("Check if there is work available");
        let mut tasks = Vec::new();
        
        while current_work < max_work_async {
            if let Some(work) = queue_clone.get_work() {
                trace!("Get Work");
                let task = tokio::task::spawn(complex_calculation(work, 1000));
                tasks.push(task);
                work_done += 1;
                current_work += 1;
            } else {
                break;
            }
        }

        trace!("Wait for tasks to complete");
        for task in tasks {
            let result = task.await.unwrap();
            tx_clone.send(result).unwrap();
        }

        current_work = 0;
    }

    println!("I worked so much... Thread: {:?}, Work Done: {:?}", i, work_done);
}


#[tokio::main]
async fn main() {

    // Dont fonrget to set the environment variable
    // $env:RUST_LOG="TRACE"
    env_logger::init();

    info!("Start");

    // Store the result of the calculations
    let mut data: Vec<i32> = Vec::new();

    // Create a channel to receive data from the calculations.
    let (tx, rx) = mpsc::channel();

    // Set the maximum number of threads.
    // Create 12 threads as my CPU has 12 logical cores
    let total_threads: u32 = 12;

    // Set a maximum amount of work that a thread can do async
    let max_work_async = 10;

    // Set the amount of work to do
    let work = 120;

    // Keep track of how much work has to be done
    let mut work_remaining = 0;

    // Create a new work queue
    let queue = WorkQueue::new();
    for i in 0..work {
        queue.add_work(i).unwrap();
        work_remaining += 1;
    }

    println!("Work to be done: {:?}", queue.length());

    // Store the handles of all the threads
    let mut handles = Vec::new();

    for i in 0..total_threads {
        let tx_clone = tx.clone();

        // This is just a reference to the queue as the queue is a Arc Mutex
        let queue_clone = queue.clone();

        trace!("Create Worker");
        
        let h = tokio::spawn(create_worker(i, queue_clone, max_work_async, tx_clone));

        handles.push(h);
    }

    trace!("Poll the results");

    // Keep receiving until all the work has been done
    while work_remaining > 0 {
        match rx.recv() {
            Ok(result) => {
                data.push(result);
                work_remaining -= 1;
            },
            Err(_) => {}
        }
        trace!("-----> Work Remaining: {}", work_remaining);
    }
    
    // Make sure all the threads have finished
    for h in handles {
        h.await.unwrap();
    }

    // Check that all work has been done correctly
    let mut total = 0;
    for i in data {
        total += i;
    }

    let mut expected_total = 0;
    for i in 0..work {
        expected_total += i * 2;
    }

    println!("Expected: {:?}, Result: {:?}", expected_total, total);
    info!("End");
}
1 Like

tokio::spawn doesn't spawn a thread. It spawns an asynchronous task, which is far far cheaper than a thread, and you can spawn many many of them without any kind of penalty. You can leave up all the task managing to Tokio - all you need to do is run tokio::spawn for as many pages as you wish to scrape and Tokio will manage the rest for you, no WorkQueue or anything needed. If you want to limit the number of requests occuring at once you should use buffered or buffer_unordered.

2 Likes

@Kestrer is totally right on this, the default tokio executor will handle concurrent and parallel execution, the default thread-count for the (default) multithreaded executor is the number of cpus.

What helped me understand the options in more detail is this StackOverflow Answer about parallel HTTP requests.

1 Like