Sharing / Sending Data between Tasks within an Axum Framework

Hi,

I am trying to create a micro-service in rust. Currently, I am using Axum. This micro-service actually only take's 1 type of request (a Post request with JSON). However, this service is extremely IO intensive, as in I am web-scraping from 1 particular source which would mean I'd have to sleep for x amount of seconds before I do another request!

I am allocating this to be done by a specific task concurrently running with other tasks. Some of the other tasks will essentially include, just waiting and receiving the post request. Note, that I send a response to the post request pretty much immediately with a "estimation" as to when there task will be completed by. Now, since I am scraping from 1 source, I don't want to be scraping every single time a post request is made, ( I am using selenium + reqwest ), hence why I am allocating a specific task to do it. But, the problem is how do I share / send data from a short living function / task (post request), to a long living task (scraping thread)?

Here is my code, any suggestions would be helpful, related or un-re

use std::net::SocketAddr;
use std::sync::Arc;
use axum::{routing::{get, post}, http::StatusCode, response::IntoResponse, Json, Router, Extension};
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;

#[derive(Deserialize, Debug, Clone)]
pub struct ScrapeRequest {
    pub name: String,
    pub phone_number : String, 
    pub email : String, 
    pub notify_flag : String,
    pub scrape_names : Vec<String>
}

async fn scrape_thread() {
    let x: Arc<Mutex<i32>> = Arc::new(Mutex::new(1));
    tokio::join!(
        {
            let mut count = x.clone(); 
            async move {
                loop {
                    // Let this be the receiving task which waits for data updates from scraping_request
                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                    let mut k = count.lock().await;
                    println!("A: Thread Counter : {}", k); 
                    *k += 10;
                }
            }
        },
        {
            let mut count = x.clone(); 
            async move { 
                loop {
                    // Let this be the "scraping" task which actually does the scraping. 
                    tokio::time::sleep(tokio::time::Duration::from_millis(990)).await;
                    let mut k = count.lock().await;
                    println!("B: Thread Counter : {}", k); 
                    *k += 1;
                }
            }
        }              
    );    
}

// This function here should be able to send data across to scrape_thread
async fn scraping_request(Json(scrape_req): Json<ScrapeRequest>) -> String {
    let data: ScrapeRequest = scrape_req; 
    println!("{:?}", data);
    return data.name;
}

#[tokio::main]
async fn main() {
    //scraping_request is the only request I'd have to handle. 
    let scrape_route: Router = Router::new().route(
        "/scrape", 
        post(scraping_request),
    );
    let addr = SocketAddr::from(([127,0,0,1], 8000)); 
    
    // Run these two tasks concurrently.
    tokio::join!(

            scrape_thread(), 

            async {
                println!("Listening!"); 
                axum::Server::bind(&addr)
                .serve(scrape_route.into_make_service())
                .await
                .unwrap();
            }
            
        );
}

You can use the tokio::sync::mpsc module to send and receive messages between async tasks. Here's a simple sketch of how you might start using that in your code

use axum::{
    extract::State,
    http::StatusCode,
    response::IntoResponse,
    routing::{get, post},
    Extension, Json, Router,
};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::{
    mpsc::UnboundedReceiver,
    mpsc::{Sender, UnboundedSender},
    Mutex,
};

#[derive(Deserialize, Debug, Clone)]
pub struct ScrapeRequest {
    pub name: String,
    pub phone_number: String,
    pub email: String,
    pub notify_flag: String,
    pub scrape_names: Vec<String>,
}

async fn scrape_thread(mut receiver: UnboundedReceiver<ScrapeRequest>) {
    let x: Arc<Mutex<i32>> = Arc::new(Mutex::new(1));
    tokio::join!(
        {
            let mut count = x.clone();
            async move {
                loop {
                    // Let this be the receiving task which waits for data updates from scraping_request
                    let req = receiver.recv().await.unwrap();
                    let mut k = count.lock().await;
                    println!("A: Request: {:?} Thread Counter : {}", req, k);
                    *k += 10;
                }
            }
        },
        {
            let mut count = x.clone();
            async move {
                loop {
                    // Let this be the "scraping" task which actually does the scraping.
                    tokio::time::sleep(tokio::time::Duration::from_millis(990)).await;
                    let mut k = count.lock().await;
                    println!("B: Thread Counter : {}", k);
                    *k += 1;
                }
            }
        }
    );
}

// This function here should be able to send data across to scrape_thread
async fn scraping_request(
    State(sender): State<UnboundedSender<ScrapeRequest>>,
    Json(scrape_req): Json<ScrapeRequest>,
) -> String {
    let data: ScrapeRequest = scrape_req;
    println!("{:?}", data);
    let name = data.name.clone();

    sender.send(data).unwrap();
    return name;
}

#[tokio::main]
async fn main() {
    let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<ScrapeRequest>();
    //scraping_request is the only request I'd have to handle.
    let scrape_route = Router::new().route("/scrape", post(scraping_request));
    let addr = SocketAddr::from(([127, 0, 0, 1], 8000));

    // Run these two tasks concurrently.
    tokio::join!(scrape_thread(receiver), async {
        println!("Listening!");
        axum::Server::bind(&addr)
            .serve(scrape_route.with_state(sender).into_make_service())
            .await
            .unwrap();
    });
}

The scraper task takes a receiver to the channel, and the axum server stores a sender in its app state. When the server gets a request, it can send the request along to the receiver that the scraper task is waiting on.

Ah, I see, so when you do the line .serve(scrape_route.with_state(sender).into_make_service()), does this create a clone of sender? Can it do that for more multiple requests? Also, this is just a general question but what happens if 2 requests are sent to the post end point at nearly the same time, will tokio instinctively open up another thread, for that request to be made again?

Yes, state has to be Cloneable in axum and is cloned for every connection, I believe.

I'm not entirely sure what you mean here. For one thing you aren't working directly with threads at all, you're working with tasks and futures. Tokio will run your tasks on it's runtime threads, of which there are a fixed number.

Two requests that come in at the same time definitely can be processed in parallel depending on CPU load at the time. However the scrape_thread future will only process a single request sent over the channel at a time.

Ah yep, sorry that's right I should look at these in terms of "tasks". Thanks for the clarification!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.