How do I return a future for queued work?

I’m looking into ways to simplify a data processing pipeline which makes heavy use of network, CPU and GPU (but no disk IO!). I’d like to limit the number of concurrent network requests (testing suggests 30 at a time works best), concurrent CPU-constrained and GPU-constrained tasks (one per core). I’d like to reduce latency by processing work for earlier tasks first, but if there’s eg no GPU heavy work for the current task that is ready yet, I’d like to pick up work for the next task. (Each task has a mix of CPU and GPU work, back and forth, so that’s possible.)

Since each task is quite large, I think I’ll be ok just wrapping a priority queue in a mutex for each pool, blocking each thread if the queue is empty.

I’m wondering how best to write the code that manages the work across different pools. An attractive option would be to return futures from the pool queue methods, that resolve when the work is complete, but I’m not sure how to create my own futures from scratch. How would I do that?

It sounds like you would want to make a queue for each of your three categories of jobs. Here are some tools you may find useful:

  1. The crossbeam channel is likely a good candidate for work queues for CPU and GPU work.
  2. The tokio crate is likely a good choice for your networking tasks.

Can you give some details about the shape of tasks? E.g. are most tasks first a combination of CPU + GPU followed by some network?

Yes, a queue for each category sounds good, thanks for those links.

The general shape of each task is a set of large network requests to fetch data to process, followed by pre-processing on the CPU then processing batches on the GPU. Depending on the results, that’s followed by further rounds of CPU and GPU processing, finally followed up with light network requests.

Initially I was using explicit separate threads to manage each processing step, communicating with std’s mpsc queues, but I found that quite verbose and hard to change when I wanted to tweak the tasks, change grouping and ordering etc. I’ve had some success with Rayon’s parallel iterators, but I’ve not found a nice way to handle scheduling priority (eg, I’d rather prioritise task 1’s second round of GPU processing over task 2’s first round so that task 1 can get finished), and I’d rather avoid nested callbacks if I can.

So I’m wondering if I can hide the queue processing behind futures, and use async/await in the “orchestration” side of the code. I’m a little intimidated by Tokio at the moment, possibly I just need to dive in and learn the details!

Is there already a simple-ish way that I can return a Future, then mark it as completed later?

Tokio has a rather neat kind of channel called a oneshot channel, which is more or less exactly what you're referring to with a task you can mark completed later.

Here are some tips for structuring this kind of program:

First, I would make a Tokio Runtime object which will handle the Tokio thread pool, along with a channel for gpu tasks and one for cpu tasks.

To spawn a task on the Tokio runtime, you can call spawn on the runtime object, however if that object is not easily accessible (e.g. from inside a cpu task), you can ask the runtime for a Handle, which you can use to spawn tasks from anywhere. You can clone the handle freely and pass it around to all your tasks to allow them to spawn tasks. Similarly you can clone the sender of your crossbeam channels to pass around task spawning abilities.

Of course, if you want to use rayon for cpu-bound tasks, you can just call the global rayon::spawn function instead of passing around some sort of handle.

In order to call other kinds of tasks when one task finishes, I would simply move the appropriate spawning handle into the task, and call the spawn function at the end of the function. E.g. to spawn a Tokio task at the end of GPU work, simply call handle.spawn(...) when the GPU work has finished.

If you wish to synchronize on the end of two tasks, e.g. a CPU and GPU task that are running in parallel, but both need to finish to start the next task, using Tokio to synchronize their completion is a good choice:

let (cpu_send, cpu_recv) = oneshot::channel();
let (gpu_send, gpu_recv) = oneshot::channel();
tokio_handle.spawn(async move {
    let cpu_result = cpu_recv.await.unwrap();
    let gpu_result = gpu_recv.await.unwrap();
    // Spawn task now.
});
// Go use cpu_send and gpu_send.

A Tokio task that isn't doing anything does not take up any resources, which makes the above extremely light-weight. Even if the next task to spawn is not a network-bound task, using Tokio to synchronize their completion should work nicely.

As for avoiding nested callbacks, sleeping in cpu or gpu tasks should be avoided, as that takes up a full thread for the duration of the sleep. It is perfectly fine to do in Tokio, as the async await makes sleeping (using .await) not take up a thread. This means that if you want to write imperative code that controls the flow of task spawning, you should do that in Tokio. Below you find an example that first spawns a cpu task, and then passes the output to a gpu task:

// For errors you just want to bubble up, you want this:
type Error = Box<dyn std::error::Error + Send + Sync>;

async my_job() -> Result<YayType, Error> {
    let (cpu_send, cpu_recv) = oneshot::channel();
    spawn_cpu_task(cpu_send);
    let cpu_result = cpu_recv.await?;

    let (gpu_send, gpu_recv) = oneshot::channel();
    spawn_gpu_task(gpu_send, cpu_result);
    let gpu_result = gpu_recv.await?;
    Ok(gpu_result)
}

If the spawned cpu or gpu task fails, the oneshot channel will immediately fail with a RecvError (this happens when the sender is destroyed), and the question mark operator will make the error bubble up. The actual cause of failure will probably be printed to stderr by the cpu or gpu task — to catch it, one could create a wrapper around oneshot channel that uses catch_unwind to capture the failure and send it to the Tokio task.

As for prioritizing tasks, you can make your task channel include a priority (e.g. an integer), and use a construct like this, using a binary heap to prioritize them. It will always return the highest-priority task. Unfortunately you will lose the multiple-receiver property of crossbeam-channels, but if you put it in a mutex, I think it should work rather nicely. Note that this doesn't mix very well with rayon; you would need to build your own thread pool to use this approach.

You will probably want to use the reqwest crate for your network operations, as it integrates nicely with Tokio. You can limit the number of concurrent network operations using a Semaphore.

A word of warning: All blocking inside Tokio must happen with an .await. A rule of thumb is this: If your Tokio task doesn't near-instantly go from one .await to another, you are waiting without giving back control to Tokio.

2 Likes

Thanks for the great response, lots to learn. As a start, I think this will make a nice building block:

async fn execute<F, T>(&self, f: F) -> T
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static,
{
    let (sender, receiver) = oneshot::channel();
    thread::spawn(move || {
        if let Err(_) = sender.send(f()) {
            panic!("Could not send");
        }
    });
    receiver.await.expect("Could not receive")
}

then I can hide the fact that the processing is being offloaded to a threadpool quite nicely:

struct SlowCounter<'a> {
    pool: &'a SpawnPool,
}

impl SlowCounter<'_> {
    fn new(pool: &SpawnPool) -> SlowCounter {
        SlowCounter { pool }
    }

    async fn count(&self, interval: Duration) -> usize {
        self.pool.execute(move || slow_count(interval)).await
    }
}

fn slow_count(interval: Duration) -> usize {
    let mut count = 0;
    println!("Counting...");
    thread::sleep(interval);
    for _ in 0..5 {
        count += 1;
        println!("{}...", count);
        thread::sleep(interval);
    }
    println!("Complete!");
    count
}

It will be interesting to see if this actually ends up simplifying my code or not!

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.