Design of an Async Queue

Hi! I'm trying to write a simple async queue, but went into trouble when dealing with inputs with reference:

  1. I want to spawn a task which takes a reference as inputs (let's call its lifetime 'a).
  2. I'm sure that the lifetime of the async queue ('b) is shorter than 'a (which means, 'a: 'b).
  3. However any tokio::spawn ish operations requires reference to be 'static.
  4. So I have to transmute the inputs because I'm sure the inputs ('a) will live long enough until the queue ('b) is completely clean up.

I'm struggling with: whether I can avoid the transmute here? Or am I missing something, that even 'a: 'b, the above sequences will lead to UB so I cannot do it safely?

I said UB, not bugs, because I'm already using transmute and everything seems to be working smoothly...

Here's the (pseudo) codes I've abstracted out, feel free to skip it if you already know what my problem is:

impl<T, R> AsyncQueue<T, R>
where
    T: Send + 'static,
    R: Send + 'static,
{
    // what should I do if `T` here contains references? (e.g., &'a [f32])
    pub fn submit(&mut self, id: usize, data: T) {
        let worker = Arc::clone(&self.worker);
        let results = Arc::clone(&self.results);
        self.rt.spawn(async move {
            let result = worker.read().unwrap().process(data);
            results.lock().unwrap().insert(id, result);
        });
    }

    pub fn pop(&self, id: usize) -> Option<Result<R>> {
        self.results.lock().unwrap().remove(&id)
    }
}

Thanks in advance!

Your code is almost certainly wrong in the face of cancellation. If submit gets cancelled, you will have a use-after-free.

Edit: this reply is wrong ... you are not making the standard mistake

Ah, maybe I simplified too many details!

When I say 'clean up' at step 4, I mean I'll wait for all in-queue tasks to finish, which is not showed in my pseudo codes for simplicity.

Is it still wrong after this?

Please share the fields of AsyncQueue.

No problem! Here's the full, compiled codes:

use anyhow::Result;
use std::{
    collections::HashMap,
    sync::{Arc, Mutex, RwLock},
};
use tokio::{
    runtime::{Builder, Runtime},
    task::JoinHandle,
};

pub fn init_rt(num_threads: usize) -> Result<Runtime> {
    let rt = if num_threads <= 1 {
        Builder::new_current_thread().enable_all().build()?
    } else {
        Builder::new_multi_thread()
            .worker_threads(num_threads)
            .enable_all()
            .build()?
    };
    Ok(rt)
}

pub trait Worker<T, R>: Send + Sync
where
    R: Send,
{
    fn process(&self, cursor: usize, data: T) -> Result<R>;
}

pub struct AsyncQueue<T, R>
where
    T: Send,
    R: Send,
{
    rt: Runtime,
    worker: Arc<RwLock<Box<dyn Worker<T, R>>>>,
    results: Arc<Mutex<HashMap<usize, Result<R>>>>,
    pending: Vec<JoinHandle<()>>,
}
impl<T, R> AsyncQueue<T, R>
where
    T: Send + 'static,
    R: Send + 'static,
{
    pub fn new(worker: Box<dyn Worker<T, R>>, num_threads: usize) -> Result<Self> {
        Ok(Self {
            rt: init_rt(num_threads)?,
            worker: Arc::new(RwLock::new(worker)),
            results: Arc::new(Mutex::new(HashMap::new())),
            pending: Vec::new(),
        })
    }

    pub fn submit(&mut self, cursor: usize, data: T) {
        let worker = Arc::clone(&self.worker);
        let results = Arc::clone(&self.results);
        let handle = self.rt.spawn(async move {
            let result = worker.read().unwrap().process(cursor, data);
            results.lock().unwrap().insert(cursor, result);
        });
        self.pending.push(handle);
    }

    pub fn pop(&self, cursor: usize) -> Option<Result<R>> {
        self.results.lock().unwrap().remove(&cursor)
    }

    pub fn reset(&mut self) -> Result<()> {
        use anyhow::Ok;

        self.results.lock().unwrap().clear();
        self.pending.drain(..).try_for_each(|handle| {
            handle.abort();
            self.rt.block_on(handle)?;
            Ok(())
        })?;
        Ok(())
    }
}

So by saying 'clean up', I mean calling the reset method of AsyncQueue.

It happens to be the case that the destructor of Runtime does approximately the same as reset.

Anyway, it's possible that your code uses AsyncQueue in a way that does not trigger UB, but generally good Rust API design says that users of your API must not be able to trigger UB in any way using only safe methods. The problem with your transmute is that the user can mem::forget your AsyncQueue. You can read about it in the nomicon or this old (more informal) article.

You can do what you're trying to do safely via scope_and_block from the async-scoped crate. Basically, your struct should look like this:

struct AsyncQueue<'a, 'b, T: 'a, R> {
    scope: &'a mut async_scoped::Scope<'b, (), async_scoped::spawner::use_tokio::Tokio>,
    worker: Arc<RwLock<Box<dyn Worker<T, R>>>>,
    results: Arc<Mutex<HashMap<usize, Result<R>>>>,
    pending: Vec<JoinHandle<()>>,
}

This way, it becomes possible to spawn futures as long as they have no lifetime annotations shorter than 'a, where 'a will be the duration of a call to scope_and_block somewhere in your program.

Note that this strategy works only because your code runs from outside of the runtime context. This would not work from an async fn because scope_and_block doesn't work there. (It blocks the thread). I normally do not recommend use of async-scoped, and you're in one of the very rare cases where it can be used.

5 Likes

Wooowww! Thank you so much, this is EXACTLY what I want! :smile: :smile:

I'll follow your advice and refactor my codes!

As written, all of your Workers have a non-async process() function. A more traditional thread pool or something like rayon might be a better fit than tokio for this application.

1 Like

True, I'm using tokio here simply because it can spawn background task easily, which I did not find a trivial solution in rayon.

Do you have any advice?

@2e71828 makes an excellent point.

You can spawn a background task with rayon using rayon::spawn. That said, you probably want rayon::scope for similar reasons as why tokio::spawn was causing you issues.

3 Likes

Maybe I'm too newbie, but from what I've known right now, I don't have something like Runtime in rayon, so I cannot spawn it to the background with scope bounded, no?

Or maybe you are suggesting spawning a rayon::scope + rayon::spawn inside the rt.spawn?

You call rayon::scope to create a scope that tasks cannot escape. That gives you a Scope object and you use the Scope::spawn method to spawn tasks bounded by the scope.

Rayon is used instead of Tokio here, not together with Tokio.

3 Likes

Darn, I've always considered rayon::scope simply as a magic function that helps me manage moves and lifetimes. :joy:

So it seems to be using the Scope struct behind the scene, I'll look into it and study. Thank you again for pointing out the resources and for the detailed explanations!!!

Yeah, well, it's not magic. If you call rayon::spawn within rayon::scope, then the task is not bound to the scope and can escape the scope.

4 Likes