Crates to use for worker processes

I'm building my first webserver in Rust, having come from Python.

In Python, whenever I had jobs I wanted to outsource to a workers I would use rq and rq-scheduler. By jobs I mean anything that didn't immediately return to a user (eg a job to calculate something substantial and store in db) or something that ran on schedule (eg daily payment collections).

Which crates should I take a look at for similar functionality in rust? I'd favor simplicity over full-featureness.

And a related question: my webserver is fully async (actix-web and sqlx), which made me wonder if I even need a "worker" or I can just manage to achieve the same effects using async tasks? I'm new to async programming to any advice would be greatly appreciated.

Thanks!

It depends on what you are using the worker for.

Python doesn't really work well when you want to do compute-heavy tasks in multiple threads because the GIL essentially serializes everything, but Rust uses bare OS threads so you can run tasks in the background as much as you want. There are libraries which provide a threadpool abstraction for doing exactly this... If you are using an async framework it's usually enough to us their spawn() function (for async jobs) or spawn_blocking() (for sync jobs) to make sure a task is done on its threadpool.

Alternatively, if you are wanting to use workers to distribute tasks among multiple computers, you can often use clients for normal queue libraries like RabbitMQ or Redis (which has pub-sub capabilities) and follow the same patterns as you would in Python.

1 Like

Your web server is almost certainly using an async runtime under the hood. To spawn a background task, you would just spawn a task on that runtime — typically that would be the tokio::spawn method. For doing expensive computations in async applications, you should read this article, which explains how you can do that.

Hey @alice - thanks for the reply, and for the article!

So your answer seems to suggest I don't need a worker at all? You mention tokio::spawn which is simply an async event loop and the article talks about multiple ways to spawn threads - but neither mentions workers.

To give a bit more context the task I need to run is to pull 150,000 tweets from twitter and save them to the database (so IO bound).

How would you rank the 3 options (async tasks, threads, workers) from best to worst and why?

Thanks a ton.

I'm not sure what you mean by worker, but if the operation is IO bound, you should spawn one or more async tasks and do it there.

Thanks @Michael-F-Bryan.

My task is pulling 150,000 tweets from twitter and saving them to a db. I'm leaning towards a worker, as I'm thinking I should spawn a new worker process AND have it run in an async loop, while all the API calls to twitter are done.

Is that how you would do it?

Hm, what I mean by worker is a whole separate process with its own memory (unlike a thread). Think about running a separate container in docker-compose or Kubernetes - I would call that a worker.

That's what python-rq that I mentioned in the original post gives you. You start a separate process from the command line and communicate with it via a queue of events in Redis.

There's no point in spawning a new process. It only makes sense if you are using a language like Python with a GIL that makes single-process multithreading infeasible, or if you need to sandbox it in some way.

1 Like

See that is so interesting! I did not realize that. Again, beginner here, so pardon any ignorance.

Question then: is it possible to run an entire async event loop on a separate thread? Let's say I need to do 1500 calls to twitter that each return 1000 tweets. Can I spawn 1 thread and run 1500 calls in an async even loop on that thread? Is that how you'd do it?

Yes, you can separate them to their own thread if you wish. To do this, you would spawn a thread with std::thread::spawn, create a tokio::runtime::Runtime in current_thread mode and spawn all the stuff on that runtime.

That said, assuming your web server is also using a runtime, you can also just run both on the same runtime. This would be my first approach unless some reason for separating them comes up.

1 Like

@alice thanks for all the replies so far. I went ahead and implemented both options using the clokwerk library I found online:

1)using the same async runtime on main thread

#[actix-web::main]
pub async fn main() {
    let mut scheduler = AsyncScheduler::new();

    scheduler.every(10.seconds()).run(move || {
        let arc_pool = pg_pool2.clone();
        let arc_config = config2.clone();
        async {
            pull_from_main(arc_pool, arc_config).await;
        }
    });

    tokio::spawn(async move {
        loop {
            scheduler.run_pending().await;
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    });
}

and 2)using a separate thread:

#[actix-web::main]
pub async fn main() {
    let handle = thread::spawn(move || {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .unwrap();
        rt.block_on(async {
            pull_tweets2(pg_pool2, config2).await;
        });
    });
}

pub async fn pull_tweets2(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
    let mut scheduler = AsyncScheduler::new();

    scheduler.every(10.seconds()).run(move || {
        let arc_pool = pg_pool2.clone();
        let arc_config = config2.clone();
        async {
            pull_from_main(arc_pool, arc_config).await;
        }
    });

    loop {
        scheduler.run_pending().await;
        tokio::time::sleep(Duration::from_millis(100)).await;
    }
}

How should I think about comparing these two options? Code clarity-wise the first one is slightly better, but what about efficiency? Is there a way for me to compare the two? Are there tools / crates I could take a look at?

Thanks a ton in advance.

I would prefer the first version. There's no obvious reason why you need a separate runtime, so I wouldn't do that.

Some comments:

  • You would generally use the current_thread scheduler for stuff like the second version. Otherwise you are spawning quite a few threads.
  • You don't need the clokwerk crate. Tokio's time module has things you could use to do the same thing.
  • The fact that clokwerk requires you to loop with a sleep like that actually means that using Tokio directly would be more efficient since you don't wake every 100 ms if there's nothing to do.

Very interesting. Is this how I'd do it?

tokio::spawn(async move {
        let mut interval = time::interval(Duration::from_secs(5));
        loop {
            pull_from_main(pg_pool2.clone(), config2.clone()).await;
            interval.tick().await;
        }
    });

You usually put the tick first because the first tick happens immediately, but otherwise yes.

1 Like

So I'm actually struggling with that exact moment. My function now looks like this:

tokio::spawn(async move {
        let mut interval = time::interval(Duration::from_secs(5));
        loop {
            println!("1/4 waiting")
            interval.tick().await;

            println!("2/4 calling fn1")
            some_async_fn1().await;

            println!("3/4 calling fn2")
            some_async_fn2().await;

            println!("4/4 calling fn3")
            some_async_fn3().await;
        }
    });

And what I see in the output when I start the runtime:

1/4 waiting
2/4 calling fn1
3/4 calling fn2
4/4 calling fn3
1/4 waiting
//then it pauses

So on start it does the first loop without any regard for the interval. Why migth that be?

My workaround so far has been to add:

        let mut first_run = true;
        loop {
            if first_run {
                first_run = false;
                interval.tick().await;
                continue;
            }

But that can't be the best way...

That if is overly complicated. You can add a call to tick before starting the loop.

1 Like

No idea why I haven't thought about adding one before the loop... Thanks a ton again.

Final code for anyone reading this thread:

tokio::spawn(async move {
        let mut interval = time::interval(Duration::from_secs(5));
        interval.tick().await;
        loop {
            println!("1/4 waiting")
            interval.tick().await;

            println!("2/4 calling fn1")
            some_async_fn1().await;

            println!("3/4 calling fn2")
            some_async_fn2().await;

            println!("4/4 calling fn3")
            some_async_fn3().await;
        }
    });

Works like a charm.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.