Any async-task-queue library/crate?

Hi, I just started learning Rust this year.

I am trying to build a multi-device web application (that runs on web and android), where the user expects each of their actions runs sequentially. They do not want to run it in parallel.

So in this very specific case, I need an asynchronous task queue. Correct me if I was wrong as I am new in System Design too, but I strongly confident it is the only solution.

Is there any libraries/crates that provides me asynchronous task queue?

Any help is welcome.
Regards, @thebluetropics from Indonesia.

When you say multi-device, do you mean that the queue spans many devices (i.e. is probably stored on a central server/service) or merely that the application targets multiple platforms, and each instance has a local queue? If the latter, does the queue persist across the application quitting/restarting?

2 Likes

Yes, it is stored on a central server/service. Basically, first-in first-out queue (i.e a client request that comes earlier will be processed earlier too).

Let's say there's two device(client) that send a request to the server (we call it "action"), Client A and Client B.

Let's say Client B's action arrives first on the server, the action requires about 24ms to complete. So, if Client A send an action to the server, they must wait for 24ms.

The other user in the server doesn't get affected by each other.

I believe it has nothing to do with something else. All I need is an asynchronous task queue, similar to the Queue in Async Javascript Library.

Thanks for any help.

I also asked whether the queue needs to be persistent across restarts.

If not, then all you need is an async channel and an async task which takes items from the receiving side of the channel and executes them. If so, then I don't have any specific recommendation.

2 Likes

My bad.

No, it doesn't have to be persistent across restarts. If it is, I think I can implement them myself.

Can it send futures of different Output type?

For example:

async fn example_future_0() -> () {}
async fn example_future_1() -> String {}

Can I send both futures to the same channel?

You would need an enum or a trait object.

1 Like

Can I have an example, please?

One way would be to set it up so that all of the enqueued tasks have no return value, but instead send their result back along a oneshot channel. Something like this (untested, probably has lots of errors):

#[derive(Clone)]
struct QueueSender(Sender<BoxFuture<()>>);

impl QueueSender {
    fn enqueue<T>(&self, task: impl Future<Output=T>)->oneshot::Receiver<T> {
        let (tx, rx) = oneshot::channel();
        self.0.send(async move {
            let _ = tx.send(task.await);
        }.boxed());
        rx
    }
}
2 Likes

Hi @2e71828

Is there any performance overhead (that I can worry about) of using channel for responses instead of using regular asynchronous function?

For example, I am planning to make my applications to be able to serve 1024 users. If each user has their own queues for a maximum of 8 tasks, so I need to have 1024*8 oneshot::channel?

Anyway, thank you for giving me the solution, I can continue building my application now.

1 Like

There's probably some overhead¹, but I doubt it'll be noticeable at the scale you're talking about. The only way to know for sure is to benchmark it.

¹ From, e.g. at least two heap allocations: One for the boxed future and another for the oneshot channel

2 Likes

I'll benchmark once I got more users, but for now I don't think I have to worry about.

I don't really understand what's the problem here, I've been reading several articles about lifetimes, but no clue:

use std::future::Future;
use tokio::sync::mpsc::Sender;
use futures::channel::oneshot;
use tokio::sync::mpsc;
use futures::FutureExt;
use futures::future::BoxFuture;

pub struct Queue<'a> {
  tx: Sender<BoxFuture<'a, ()>>
}

impl<'a> Queue<'a> {
  fn new(sz: usize) -> Queue<'a> {
    let (tx, mut rx) = mpsc::channel::<BoxFuture<()>>(sz);
    
    tokio::spawn(async move {
      match rx.recv().await {
        Some(f) => f.await,
        None => panic!()
      }
    });

    Queue { tx }
  }
}
lifetime may not live long enough
requirement occurs because of the type `Queue<'_>`, which makes the generic argument `'_` invariant
the struct `Queue<'a>` is invariant over the parameter `'a`
see <https://doc.rust-lang.org/nomicon/subtyping.html> for more information about variancerustcClick for full compiler diagnostic
main.rs(12, 6): lifetime `'a` defined here

Your solution works when I try it without struct, I can manage to solve all the compiler errors.

1 Like

For this application, the lifetime in BoxFuture should be 'static, that is,

pub struct Queue {
  tx: Sender<BoxFuture<'static, ()>>
}

This means that the future cannot borrow anything (that is shorter-lived than "forever"), which is necessary for a queue like this because the queue will live longer than any individual task's inputs.

6 Likes

Hi @kpreid

Thank you for telling me the solution. I really appreaciate that.
I am currently still learning about lifetimes.

TL;DR Queue should not be 'static, the Queue object should as long as the user remains in connection with the server.

I am sorry, but my queue will not be 'static. Or, shouldn't be 'static to be exact...

The reason is that the user has their own Queue object. And the Queue is created when the server established a websocket connection with the user, and live as long as the user remains connected with the server. A user can have multiple websocket connection at a time, up to 2 connection, they won't be able to create more than that. There's another reason why I limit them.

That's it. I can't use 'static for this reason.

Note: While I am trying to solve this problem in my web application, I never stop to learn more about Rust. With this thread alone, I learned a lot about lifetimes, thanks to you guys.

(I'm not following this thread closely, but) you may be misinterpreting what 'static means in this case. As @kpreid said, it just means that values of that type cannot contain any short-lived borrows. It doesn't mean that Queue values last forever or are never destructed. That's a common misconception. The 'static here means that the future satisfies a 'static bound.

A 'static bound conveys a property of the type, not of values of that type per se. The only thing a 'static bound says about the liveness of values is that they could be arbitrarily long.

4 Likes

Hi @quinedot

Thank you for pointing me the misconception about the 'static lifetime. I've always think that 'static should be valid for the entire program (because it is what static means, at least in C and C++).

1 Like

Thank you, everyone!
I have managed to build the Queue successfully:

struct Queue {
  tx: mpsc::Sender<BoxFuture<'static, ()>>
}

impl Queue {
  fn new(sz: usize) -> Queue {
    let (tx, mut rx) = mpsc::channel(sz);

    tokio::spawn(async move {
      loop {
        match rx.recv().await {
          Some(f) => f.await,
          None => {}
        }
      }
    });

    Queue { tx }
  }

  async fn enqueue<T: Send + 'static>(&self, f: impl Future<Output = T> + Send + 'static) -> T {
    let (tx, rx) = oneshot::channel::<T>();

    let _ = self.tx.send(async move {
      let v = f.await;
      let _ = tx.send(v);
    }.boxed()).await;

    rx.await.unwrap()
  }
}

I don't quite understand what's happening though...
I will post the queue code in a code review thread for further improvements!