Asynchronous multi-consumer queue?

I have a need to implement an async task pool in which a number of worker tasks pop job inputs from a queue of some sort and operate on them in a loop. As an added wrinkle, the queue is simultaneously being pushed to by other tasks in a separate pool. What's the best data structure I should use for the queue?

  • Would a plain Arc<Mutex<VecDeque<Input>>> likely be sufficient?

  • Should I use a work-stealing queue? Some brief searching on lib.rs didn't turn up anything that looked usable with tokio. Suggestions?

  • Should I use some async multi-producer, multi-consumer, non-broadcast channel? I'm not aware of any. Would such a type likely just use work-stealing underneath?

  • Other suggestions?

2 Likes

I recommend using tokio::sync::Semaphore and doing a tokio::spawn call per job. See this example.

3 Likes

That's what I'm currently doing, but the task futures are very large, and there are a lot of individual tasks, so I keep running into memory issues.

The example I linked acquires the permit before spawning, which should avoid creating the large future before it's ready to run.

I don't think a blocking spawn will work in my case without bending a bunch of things over backwards. Thanks anyway.